聊聊CanalMQStarter

代码紫霄使
• 阅读 1257

本文主要研究一下CanalMQStarter

CanalMQStarter

canal-1.1.4/server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java

public class CanalMQStarter {

    private static final Logger          logger         = LoggerFactory.getLogger(CanalMQStarter.class);

    private volatile boolean             running        = false;

    private ExecutorService              executorService;

    private CanalMQProducer              canalMQProducer;

    private MQProperties                 properties;

    private CanalServerWithEmbedded      canalServer;

    private Map<String, CanalMQRunnable> canalMQWorks   = new ConcurrentHashMap<>();

    private static Thread                shutdownThread = null;

    public CanalMQStarter(CanalMQProducer canalMQProducer){
        this.canalMQProducer = canalMQProducer;
    }

    public synchronized void start(MQProperties properties, String destinations) {
        try {
            if (running) {
                return;
            }
            this.properties = properties;
            canalMQProducer.init(properties);
            // set filterTransactionEntry
            if (properties.isFilterTransactionEntry()) {
                System.setProperty("canal.instance.filter.transaction.entry", "true");
            }

            canalServer = CanalServerWithEmbedded.instance();

            // 对应每个instance启动一个worker线程
            executorService = Executors.newCachedThreadPool();
            logger.info("## start the MQ workers.");

            String[] dsts = StringUtils.split(destinations, ",");
            for (String destination : dsts) {
                destination = destination.trim();
                CanalMQRunnable canalMQRunnable = new CanalMQRunnable(destination);
                canalMQWorks.put(destination, canalMQRunnable);
                executorService.execute(canalMQRunnable);
            }

            running = true;
            logger.info("## the MQ workers is running now ......");

            shutdownThread = new Thread() {

                public void run() {
                    try {
                        logger.info("## stop the MQ workers");
                        running = false;
                        executorService.shutdown();
                        canalMQProducer.stop();
                    } catch (Throwable e) {
                        logger.warn("##something goes wrong when stopping MQ workers:", e);
                    } finally {
                        logger.info("## canal MQ is down.");
                    }
                }

            };

            Runtime.getRuntime().addShutdownHook(shutdownThread);
        } catch (Throwable e) {
            logger.error("## Something goes wrong when starting up the canal MQ workers:", e);
        }
    }

    public synchronized void destroy() {
        running = false;
        if (executorService != null) {
            executorService.shutdown();
        }
        if (canalMQProducer != null) {
            canalMQProducer.stop();
        }
        if (shutdownThread != null) {
            Runtime.getRuntime().removeShutdownHook(shutdownThread);
            shutdownThread = null;
        }
    }

    //......
}
  • CanalMQStarter提供了start、destroy方法;其start方法使用MQProperties来初始化canalMQProducer,然后通过CanalServerWithEmbedded.instance()获取canalServer,之后遍历destinations,创建canalMQRunnable提交给executorService执行,最后注册了shutdownThread,在jvm关闭时执行executorService.shutdown()及canalMQProducer.stop();其destroy方法也是执行executorService.shutdown()及canalMQProducer.stop(),它还会从Runtime.getRuntime()的shutdownHook移除shutdownThread

CanalMQRunnable

canal-1.1.4/server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java

    private class CanalMQRunnable implements Runnable {

        private String destination;

        CanalMQRunnable(String destination){
            this.destination = destination;
        }

        private AtomicBoolean running = new AtomicBoolean(true);

        @Override
        public void run() {
            worker(destination, running);
        }

        public void stop() {
            running.set(false);
        }
    }
  • CanalMQRunnable实现了Runnable接口,其run方法执行worker(destination, running)

worker

canal-1.1.4/server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java

public class CanalMQStarter {

    //......

    private void worker(String destination, AtomicBoolean destinationRunning) {
        while (!running || !destinationRunning.get()) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                // ignore
            }
        }

        logger.info("## start the MQ producer: {}.", destination);
        MDC.put("destination", destination);
        final ClientIdentity clientIdentity = new ClientIdentity(destination, (short) 1001, "");
        while (running && destinationRunning.get()) {
            try {
                CanalInstance canalInstance = canalServer.getCanalInstances().get(destination);
                if (canalInstance == null) {
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        // ignore
                    }
                    continue;
                }
                MQProperties.CanalDestination canalDestination = new MQProperties.CanalDestination();
                canalDestination.setCanalDestination(destination);
                CanalMQConfig mqConfig = canalInstance.getMqConfig();
                canalDestination.setTopic(mqConfig.getTopic());
                canalDestination.setPartition(mqConfig.getPartition());
                canalDestination.setDynamicTopic(mqConfig.getDynamicTopic());
                canalDestination.setPartitionsNum(mqConfig.getPartitionsNum());
                canalDestination.setPartitionHash(mqConfig.getPartitionHash());

                canalServer.subscribe(clientIdentity);
                logger.info("## the MQ producer: {} is running now ......", destination);

                Long getTimeout = properties.getCanalGetTimeout();
                int getBatchSize = properties.getCanalBatchSize();
                while (running && destinationRunning.get()) {
                    Message message;
                    if (getTimeout != null && getTimeout > 0) {
                        message = canalServer.getWithoutAck(clientIdentity,
                            getBatchSize,
                            getTimeout,
                            TimeUnit.MILLISECONDS);
                    } else {
                        message = canalServer.getWithoutAck(clientIdentity, getBatchSize);
                    }

                    final long batchId = message.getId();
                    try {
                        int size = message.isRaw() ? message.getRawEntries().size() : message.getEntries().size();
                        if (batchId != -1 && size != 0) {
                            canalMQProducer.send(canalDestination, message, new CanalMQProducer.Callback() {

                                @Override
                                public void commit() {
                                    canalServer.ack(clientIdentity, batchId); // 提交确认
                                }

                                @Override
                                public void rollback() {
                                    canalServer.rollback(clientIdentity, batchId);
                                }
                            }); // 发送message到topic
                        } else {
                            try {
                                Thread.sleep(100);
                            } catch (InterruptedException e) {
                                // ignore
                            }
                        }

                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }
                }
            } catch (Exception e) {
                logger.error("process error!", e);
            }
        }
    }

    //......
}
  • worker方法创建ClientIdentity,然后根据destination从canalServer.getCanalInstances()获取canalInstance,然后创建canalDestination,之后执行canalServer.subscribe(clientIdentity);然后while循环执行canalServer.getWithoutAck拉取message,通过canalMQProducer.send进行发送

小结

CanalMQStarter提供了start、destroy方法;其start方法使用MQProperties来初始化canalMQProducer,然后通过CanalServerWithEmbedded.instance()获取canalServer,之后遍历destinations,创建canalMQRunnable提交给executorService执行,最后注册了shutdownThread,在jvm关闭时执行executorService.shutdown()及canalMQProducer.stop();其destroy方法也是执行executorService.shutdown()及canalMQProducer.stop(),它还会从Runtime.getRuntime()的shutdownHook移除shutdownThread

doc

点赞
收藏
评论区
推荐文章
blmius blmius
4年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
Wesley13 Wesley13
4年前
java8 List集合的排序,求和,取最大值,按照条件过滤
public class Java8Test{public static void main(Stringargs){Personp1 new Person("麻子", 31);Personp2 new Person("
Wesley13 Wesley13
4年前
MySQL如何实时同步数据到ES?试试这款阿里开源的神器
摘要mall项目中的商品搜索功能,一直都没有做实时数据同步。最近发现阿里巴巴开源的canal可以把MySQL中的数据实时同步到Elasticsearch中,能很好地解决数据同步问题。今天我们来讲讲canal的使用,希望对大家有所帮助!canal简介canal主要用途是对MySQL数据库增量日志进行解析,提供增量数据的订阅和消
Stella981 Stella981
4年前
Canal简介及配置说明
1.简介canal是纯Java开发的,基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了mysql。原理相对比较简单:1.1.canal模拟mysqlslave的交互协议,伪装自己为mysqlslave,向mysqlmaster发送dump协议2.2.mysqlmaster收到dump请
Stella981 Stella981
4年前
Nginx 反向代理Springboot oAuth https配置方案
Nginx配置方案server{listen80;server_namewww.yourname.com;rewrite^(.)$https://${server_name}$1permanent;}server{
Stella981 Stella981
4年前
Canal & Otter 的一些注意事项和最佳实践
1,canal和otter由于是java开发的,运行在windows和linux上都可以2,为了使用otter必须要canal的支持,otter作为canal的消费方,当然也可以单独使用canal,如果你有消费mysqlbinlog的需求3,canal有几种运行方式,生产环境中推荐使用zookeeper的持久化方式,对应的spring配置文件为:d
Wesley13 Wesley13
4年前
Canal+Otter
数据库同步中间件CanalOtter前日篇(2)MySQLInnoDB架构体系!这里写图片描述(https://static.oschina.net/uploads/img/201712/13102527_0Qct.jpg)MySQL体系前
Wesley13 Wesley13
4年前
Canal+Otter
数据库同步中间件CanalOtter前日篇(2)MySQLInnoDB架构体系!这里写图片描述(http://static.oschina.net/uploads/img/201604/24113828_GCwY.jpg)MySQL体系前端
Stella981 Stella981
4年前
Canal使用报错解决办法
1、 \destinationtest\_cancal,address/127.0.0.1:3306,EventParser\WARNc.a.o.s.a.i.setl.zookeeper.termin.WarningTerminProcessnid:1\1:canal:test\_cancal:java.lang.Null
Stella981 Stella981
4年前
Nginx 反向代理部署NodeJs应用
server {    listen       80 default_server;    server_name  _;    charset koi8r;    access_log  logs/host.access.log  main;     Load