ActiveMQ服务器数据迁移的一点心得

Wesley13
• 阅读 1162

     ActiveMQ实现Master-Slave的方式有几种,一种是使用Jdbc Peristent的方式,未被Consumer读取的消息和死消息写在数据库的ActiveMQ_MSGS表中,还有一种是使用LevelDB文件,这些消息将被存储在Level DB的.log文件中。

    现在我们项目有一个需求是将原先使用Jdbc Persistent的ActiveMQ转换为使用Zk + Replicated Level DB的ActiveMQ,这就产生了一个问题:如何将原先保存在ActiveMQ_MSGS表的消息导入到新ActiveMQ服务器的LevelDB数据文件中?

    我们查看了使用jdbc peristent的ACTIVEMQ_MSGS表数据(以MySQL数据库为例)

   ActiveMQ服务器数据迁移的一点心得

  可以看出存储在数据库里的消息是以Blob形式保存的。

  使用leveldb形式保存的消息数据的形式是二进制形式的

ActiveMQ服务器数据迁移的一点心得

   我们可以看出两者的数据存储格式完全不一样,因此不可能像jdbc peristent迁移那样,将activemq_msgs表数据从一个数据库迁移到另一个数据库那样简单。只能采用从源ActiveMQ服务器读取队列消息,再将队列消息发送到目标ActiveMQ的方式。

   我们搭建了两个CentOS虚拟机,虚拟机A上是源ActiveMQ,使用jdbc persistent(MySQL),有三个Queue:TestQueue1,TestQueue2,TestQueue3.

   虚拟机B使用LevelDB做数据持久化。

1.单ActiveMQ,单队列消息迁移

  ActiveMQ服务器数据迁移的一点心得

  TestQueue1有三条持久化消息

  ActiveMQ服务器数据迁移的一点心得

   消息的内容我们可以使用activemq-admin.bat查看(这个只在windows环境下可用),这个命令支持获取远程ActiveMQ信息。

activemq-admin.bat browse --amqurl tcp://xxx.xxx.xxx.xxx:61616 TestQueue1

   获得的TestQueue1队列信息如下所示  

JMS_HEADER_FIELD:JMSDestination = TestQueue1
JMS_BODY_FIELD:JMSText = Test Queue1's first message.
JMS_HEADER_FIELD:JMSType =
JMS_HEADER_FIELD:JMSCorrelationID =
JMS_HEADER_FIELD:JMSDeliveryMode = persistent
JMS_HEADER_FIELD:JMSMessageID = ID:rickhunter.domain-46264-1476266938779-4:1:1:1:1
JMS_HEADER_FIELD:JMSExpiration = 0
JMS_HEADER_FIELD:JMSPriority = 0
JMS_HEADER_FIELD:JMSRedelivered = false
JMS_HEADER_FIELD:JMSTimestamp = 1476267006371

JMS_HEADER_FIELD:JMSDestination = TestQueue1
JMS_BODY_FIELD:JMSText = TestQueue's second message
JMS_HEADER_FIELD:JMSType =
JMS_HEADER_FIELD:JMSCorrelationID =
JMS_HEADER_FIELD:JMSDeliveryMode = persistent
JMS_HEADER_FIELD:JMSMessageID = ID:rickhunter.domain-44431-1476325780609-4:1:1:1:1
JMS_HEADER_FIELD:JMSExpiration = 0
JMS_HEADER_FIELD:JMSPriority = 0
JMS_HEADER_FIELD:JMSRedelivered = false
JMS_HEADER_FIELD:JMSTimestamp = 1476326826475

JMS_HEADER_FIELD:JMSDestination = TestQueue1
JMS_BODY_FIELD:JMSText = TestQueue1's third message.
JMS_HEADER_FIELD:JMSType =
JMS_HEADER_FIELD:JMSCorrelationID =
JMS_HEADER_FIELD:JMSDeliveryMode = persistent
JMS_HEADER_FIELD:JMSMessageID = ID:rickhunter.domain-44431-1476325780609-4:1:1:1:2
JMS_HEADER_FIELD:JMSExpiration = 0
JMS_HEADER_FIELD:JMSPriority = 0
JMS_HEADER_FIELD:JMSRedelivered = false
JMS_HEADER_FIELD:JMSTimestamp = 1476326873071

   我们数据迁移的基本思路,是从虚拟机A的ActiveMQ上读取TestQueue1的持久化消息,发送给虚拟机B的ActiveMQ的TestQueue1.

   读取TestQueue1队列消息的代码片段如下(这段代码参考了王新春的代码, 根据实际做了一点修改,特此感谢。http://wangxinchun.iteye.com/blog/2146120?utm\_source=tuicool&utm\_medium=referral)

String url = "tcp://xxx.xxx.xxx.xxx:61616";
        ActiveMQConnectionFactory connectionFactory =
                new ActiveMQConnectionFactory("admin", "admin", url);

        ActiveMQConnection connection = null;
        try {
            connection = (ActiveMQConnection)connectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue destination = session.createQueue("TestQueue1");
            QueueBrowser browser = session.createBrowser(destination);
            Enumeration<?> enumeration = browser.getEnumeration();
            while (enumeration.hasMoreElements()) {
                TextMessage message = (TextMessage) enumeration.nextElement();
                System.out.println("Browsing: " + message.getText());                
            }
            session.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
        finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }

    上述代码只是从源ActiveMQ的TestQueue1读取所有持久化消息并将每条消息正文输出到控制台,接下来我们对这段代码进行改造,将获取的消息发送到目标ActiveMQ的TestQueue1。

    我们将每一条消息放到一个消息List中。然后连接目标ActiveMQ,将消息List中的每一条消息发送到TestQueue1。代码片段如下:  

List<TextMessage> messageList = new ArrayList<TextMessage>();

..........
Enumeration<?> enumeration = browser.getEnumeration();
while (enumeration.hasMoreElements()) 
{
    TextMessage message = (TextMessage) enumeration.nextElement();
    messageList.add(message);
}           
..........

sendMessages(messageList);
..........


 private static void sendMessages(List<TextMessage> messageList)
    {
        try
        {
            String sendActiveMQUrl = "tcp://xxx.xxx.xxx.xxx:61616";
            ActiveMQConnectionFactory connectionFactory =
                    new ActiveMQConnectionFactory(sendActiveMQUrl);
            connectionFactory.setUserName("admin");
            connectionFactory.setPassword("admin");
            ActiveMQConnection connection = (ActiveMQConnection)connectionFactory.createConnection();

            connection.setDisableTimeStampsByDefault(true);
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            //创建目标,就创建主题也可以创建队列
            Destination destination = session.createQueue("TestQueue1");
            //创建消息生产者
            MessageProducer producer = session.createProducer(destination);
            //设置持久化,DeliveryMode.PERSISTENT和DeliveryMode.NON_PERSISTENT
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);

            for(TextMessage message:messageList)
            {
                // 发送消息到ActiveMQ
                producer.send(message);
                TimeUnit.MILLISECONDS.sleep(10);
            }

            // 关闭资源
            session.close();
            connection.close();
            System.out.println("The messages import is completed.");
        }
        catch(Exception ex)
        {
            ex.printStackTrace();
        }
    }

上述代码中有一句代码请特别注意

connection.setDisableTimeStampsByDefault(true);

这句代码是控制是否使用JmsMessage本身自带的TimeStamp属性。ActiveMQConnection类的disableTimeStampsByDefault属性的API描述是这样的(基于ActiveMQ 5.14.0的代码)

 /**
     * Sets whether or not timestamps on messages should be disabled or not. If
     * you disable them it adds a small performance boost.
     */
    public void setDisableTimeStampsByDefault(boolean timeStampsDisableByDefault) {
        this.disableTimeStampsByDefault = timeStampsDisableByDefault;
    }

从API描述看出来,它是设置JMS Message自身所带的timestamp是否被禁用。具体使用这个属性的代码是在ActiveMQConnection的send方法

 void send(ActiveMQDestination destination, ActiveMQMessage msg, MessageId messageId, int deliveryMode, 
int priority, long timeToLive, boolean async) throws JMSException 
{
  ...............
   if (!isDisableTimeStampsByDefault()) {
            long timeStamp = System.currentTimeMillis();
            msg.setJMSTimestamp(timeStamp);
            if (timeToLive > 0) {
                expiration = timeToLive + timeStamp;
            }
        }
  ...............
}

从上述代码可以看出,如果disableTimeStampsByDefault属性是false,ActiveMQ在发送Jms Message对象时,将无视消息对象自带的timestamp,使用JmsMessage发送时的时间。这样对于Jms Message对象timestamp属性敏感的应用就会出现问题,而设置这个属性为true,将保留Jms Message对象原先的timestamp信息。

补充一点,如果不设置ActiveMQConnection的disableTimeStampsByDefault属性,设置MessageProducer的disableMessageTimestamp属性可以起到相同的效果。MessageProducer的disableMessageTimestamp属性的API描述是

public abstract class ActiveMQMessageProducerSupport implements MessageProducer, Closeable {
...............
 /**
     * Sets whether message timestamps are disabled.
     * <P>
     * Since timestamps take some effort to create and increase a message's
     * size, some JMS providers may be able to optimize message overhead if
     * they are given a hint that the timestamp is not used by an application.
     * By calling the <CODE>setDisableMessageTimestamp</CODE> method on this
     * message producer, a JMS client enables this potential optimization for
     * all messages sent by this message producer. If the JMS provider accepts
     * this hint, these messages must have the timestamp set to zero; if the
     * provider ignores the hint, the timestamp must be set to its normal
     * value.
     * <P>
     * Message timestamps are enabled by default.
     *
     * @param value indicates if message timestamps are disabled
     * @throws javax.jms.JMSException if the JMS provider fails to close the producer due to
     *                      some internal error.
     */
    public void setDisableMessageTimestamp(boolean value) throws JMSException
    {
      ............
    }

当MessageProducer在发送消息时使用到了这个属性

public class ActiveMQSession implements Session, QueueSession, 
   TopicSession, StatsCapable, ActiveMQDispatcher 
{
  ..............
 protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, 
                     Message message,int deliveryMode, int priority, long timeToLive,
                     MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) 
                     throws JMSException
 {
  ..............
   synchronized (sendMutex) {
   ..............
   if (!producer.getDisableMessageTimestamp()) 
   {
        long timeStamp = System.currentTimeMillis();
        message.setJMSTimestamp(timeStamp);
        if (timeToLive > 0) 
        { 
           expiration = timeToLive + timeStamp;
        }
   }
   .......
 }

对照ActiveMQConnection的send方法,可以看出两段代码的作用完全一样,因此我们可以设置MessageProducer的disableMessageTimestamp属性达到相同效果,具体修改代码如下:

ActiveMQMessageProducer producer = (ActiveMQMessageProducer)session.createProducer(destination);
producer.setDisableMessageTimestamp(true);

代码修改完成后运行,我们可以看到目标ActiveMQ的TestQueue1中已经有迁移的三条消息。

ActiveMQ服务器数据迁移的一点心得

JMS_HEADER_FIELD:JMSDestination = TestQueue1
JMS_BODY_FIELD:JMSText = Test Queue1's first message.
JMS_HEADER_FIELD:JMSType =
JMS_HEADER_FIELD:JMSCorrelationID =
JMS_HEADER_FIELD:JMSDeliveryMode = persistent
JMS_HEADER_FIELD:JMSMessageID = ID:rick.domain-4065-1476339886463-3:1:1:1:1
JMS_HEADER_FIELD:JMSExpiration = 0
JMS_HEADER_FIELD:JMSPriority = 4
JMS_HEADER_FIELD:JMSRedelivered = false
JMS_HEADER_FIELD:JMSTimestamp = 1476267006371

JMS_HEADER_FIELD:JMSDestination = TestQueue1
JMS_BODY_FIELD:JMSText = TestQueue's second message
JMS_HEADER_FIELD:JMSType =
JMS_HEADER_FIELD:JMSCorrelationID =
JMS_HEADER_FIELD:JMSDeliveryMode = persistent
JMS_HEADER_FIELD:JMSMessageID = ID:rick.domain-4065-1476339886463-3:1:1:1:2
JMS_HEADER_FIELD:JMSExpiration = 0
JMS_HEADER_FIELD:JMSPriority = 4
JMS_HEADER_FIELD:JMSRedelivered = false
JMS_HEADER_FIELD:JMSTimestamp = 1476326826475

JMS_HEADER_FIELD:JMSDestination = TestQueue1
JMS_BODY_FIELD:JMSText = TestQueue1's third message.
JMS_HEADER_FIELD:JMSType =
JMS_HEADER_FIELD:JMSCorrelationID =
JMS_HEADER_FIELD:JMSDeliveryMode = persistent
JMS_HEADER_FIELD:JMSMessageID = ID:rick.domain-4065-1476339886463-3:1:1:1:3
JMS_HEADER_FIELD:JMSExpiration = 0
JMS_HEADER_FIELD:JMSPriority = 4
JMS_HEADER_FIELD:JMSRedelivered = false
JMS_HEADER_FIELD:JMSTimestamp = 1476326873071

与源ActiveMQ的三条消息比较,除了JMSMessageID属性不一样以外(MessageId是由发送Message的Client机器ID和随机数生成的,不可能保证和原消息完全一致),其他属性完全一致。如果要保留源消息的MessageId信息,可以将其设置到JMSCorrelationID属性。

2.单ActiveMQ,多队列消息迁移

在1中我们实现单队列消息的迁移。但如果源ActiveMQ有多个队列,每个队列都有持久化消息,如何实现这些队列消息的迁移?

     我们最初的思路是将所有包含持久化消息的队列名称写入一个配置文件里,在迁移的时候读取这些队列名称。但是这样做的问题就是不灵活,有可能在迁移时一些原先没有持久化消息的队列包含了消息,一些原先有持久化消息的队列的消息被消费掉了,使得不是所有队列的消息都能被迁移。

    通过查看ActiveMQ的API文档,我们决定使用ActiveMQConnection类的destinationSource属性,这个属性的API描述是   

 /**
     * Returns the {@link DestinationSource} object which can be used to listen to destinations
     * being created or destroyed or to enquire about the current destinations available on the broker
     *
     * @return a lazily created destination source
     * @throws JMSException
     */
   @Override
    public DestinationSource getDestinationSource() throws JMSException {
    .......
   }

从API描述上可以看出这个属性是一个源对象,是对当前ActiveMQ连接上的所有Destination(包括Queue和Topic)的一个侦听对象。从DestinationSource对象,我们可以获取源ActiveMQ的所有Queue(也可以获得所有Topic,这里不再赘述).

public class DestinationSource implements MessageListener { 

  ..............
/**
     * Returns the current queues available on the broker
     */
    public Set<ActiveMQQueue> getQueues() {
        return queues;
    }
  ..............
}

获得了源ActiveMQ的Queue集合,接下来的操作就和1一样了,只是过滤掉那些不不包含持久化消息的Queue。实现代码的片段如下:

HashMap<String, List<TextMessage>> queueMessageMap =
                new HashMap<String, List<TextMessage>>();
.................
Set<ActiveMQQueue> activeMQQueues =
                    connection.getDestinationSource().getQueues();
for(ActiveMQQueue destination: activeMQQueues) 
{
    QueueBrowser browser = session.createBrowser(destination);
    Enumeration<?> enumeration = browser.getEnumeration();
    List<TextMessage> messageList = new ArrayList<TextMessage>();
    while (enumeration.hasMoreElements()) {
        TextMessage message = (TextMessage) enumeration.nextElement();
        messageList.add(message);
}

if (messageList.size() > 0)
{
    queueMessageMap.put(destination.getQueueName(), messageList);
}
 
.................
if(queueMessageMap.size() > 0)
{
    sendMessages(queueMessageMap);
}
.................

private static void sendMessages(HashMap<String, List<TextMessage>> queueMessageMap)
{
  .................
  for(String queueName:queueMessageMap.keySet())
  {
       // 创建目标,就创建主题也可以创建队列
    Destination destination = session.createQueue(queueName);
    // 创建消息生产者
    MessageProducer producer = session.createProducer(destination);
    // 设置持久化,DeliveryMode.PERSISTENT和DeliveryMode.NON_PERSISTENT
    producer.setDeliveryMode(DeliveryMode.PERSISTENT);
    List<TextMessage> messageList = queueMessageMap.get(queueName);

    for(TextMessage message:messageList)
    {
        // 发送消息到ActiveMQ
        producer.send(message);
        TimeUnit.MILLISECONDS.sleep(10);
    }            
  }

我们在源ActiveMQ的TestQueue1,TestQueue2,TestQueue3队列里分别创建三条持久化消息(如下图所示)

ActiveMQ服务器数据迁移的一点心得

删除目标ActiveMQ TestQueue1队列里1中迁移的三条消息

ActiveMQ服务器数据迁移的一点心得

运行代码,我们可以看到源ActiveMQ中的三个队列里的九条消息被迁移到了目标ActiveMQ

ActiveMQ服务器数据迁移的一点心得

3.多ActiveMQ,多队列消息迁移

在这个场景中目标ActiveMQ是使用zk + Replicated LevelDB结构的三台ActiveMQ。代码仍然使用和2相同的代码,只是连接目标ActiveMQ的brokerUrl形式改为failover:(tcp://xxx.xxx.xxx, tcp://xxx.xxx.xxx, tcp://xxx.xxx.xxx)

我们使用的三台ActiveMQ的清单如下:

ActiveMQ服务器号

tcp端口号

zk端口号

管理界面端口号

ActiveMQ-1

61616

2181

8161

ActiveMQ-2

62616

2181

8261

ActiveMQ-3

63616

2181

8361

最初三台ActiveMQ中没有队列,也没有消息(此时ActiveMQ-1为Master,ActiveMQ-2,ActiveMQ-3为Slave)

ActiveMQ服务器数据迁移的一点心得

运行2的程序后,我们可以看到ActiveMQ-1中有了TestQueue1,TestQueue2,TestQueue3三个队列,并且有了9条消息。

ActiveMQ服务器数据迁移的一点心得

当我们停掉ActiveMQ-1后,ActiveMQ-2成为Master,我们可以从它的管理界面看到ActiveMQ-2也有三个队列,9条消息。

ActiveMQ服务器数据迁移的一点心得

这说明2的程序对于Zookeeper + Replicated LevelDB架构的多ActiveMQ 数据迁移仍然有效。

4.结语

本文实现了从数据库介质的ActiveMQ到使用Replicated LevelDB文件介质的ActiveMQ的消息迁移,但并不仅限于此种场景,任何两个ActiveMQ之间都可以实现类似的消息迁移。在实际运行中,还可以使用这样的程序实现不同ActiveMQ之间的数据同步,以及在一个ActiveMQ内,消息在不同队列间的迁移。这需要开发者根据实际需要进行修改和调整。

点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Easter79 Easter79
2年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
2年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
2年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
2年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
为什么mysql不推荐使用雪花ID作为主键
作者:毛辰飞背景在mysql中设计表的时候,mysql官方推荐不要使用uuid或者不连续不重复的雪花id(long形且唯一),而是推荐连续自增的主键id,官方的推荐是auto_increment,那么为什么不建议采用uuid,使用uuid究
Python进阶者 Python进阶者
4个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这