聊聊CanalEventSink

代码骑士
• 阅读 1606

本文主要研究一下CanalEventSink

CanalEventSink

canal-1.1.4/sink/src/main/java/com/alibaba/otter/canal/sink/CanalEventSink.java

public interface CanalEventSink<T> extends CanalLifeCycle {

    /**
     * 提交数据
     * 
     * @param event
     * @param remoteAddress
     * @param destination
     * @throws CanalSinkException
     * @throws InterruptedException
     */
    boolean sink(T event, InetSocketAddress remoteAddress, String destination) throws CanalSinkException,
                                                                              InterruptedException;

    /**
     * 中断消费,比如解析模块发生了切换,想临时中断当前的merge请求,清理对应的上下文状态,可见{@linkplain GroupEventSink}
     */
    void interrupt();

}
  • CanalEventSink继承了CanalLifeCycle,它定义了sink、interrupt接口

AbstractCanalEventSink

canal-1.1.4/sink/src/main/java/com/alibaba/otter/canal/sink/AbstractCanalEventSink.java

public abstract class AbstractCanalEventSink<T> extends AbstractCanalLifeCycle implements CanalEventSink<T> {

    protected CanalEventFilter                  filter;
    protected List<CanalEventDownStreamHandler> handlers = new ArrayList<CanalEventDownStreamHandler>();

    public void setFilter(CanalEventFilter filter) {
        this.filter = filter;
    }

    public void addHandler(CanalEventDownStreamHandler handler) {
        this.handlers.add(handler);
    }

    public CanalEventDownStreamHandler getHandler(int index) {
        return this.handlers.get(index);
    }

    public void addHandler(CanalEventDownStreamHandler handler, int index) {
        this.handlers.add(index, handler);
    }

    public void removeHandler(int index) {
        this.handlers.remove(index);
    }

    public void removeHandler(CanalEventDownStreamHandler handler) {
        this.handlers.remove(handler);
    }

    public CanalEventFilter getFilter() {
        return filter;
    }

    public List<CanalEventDownStreamHandler> getHandlers() {
        return handlers;
    }

    public void interrupt() {
        // do nothing
    }

}
  • AbstractCanalEventSink继承了AbstractCanalLifeCycle,声明实现了CanalEventSink接口;它定义了filter及handlers两个属性

EntryEventSink

canal-1.1.4/sink/src/main/java/com/alibaba/otter/canal/sink/entry/EntryEventSink.java

public class EntryEventSink extends AbstractCanalEventSink<List<CanalEntry.Entry>> implements CanalEventSink<List<CanalEntry.Entry>> {

    private static final Logger    logger                        = LoggerFactory.getLogger(EntryEventSink.class);
    private static final int       maxFullTimes                  = 10;
    private CanalEventStore<Event> eventStore;
    protected boolean              filterTransactionEntry        = false;                                        // 是否需要尽可能过滤事务头/尾
    protected boolean              filterEmtryTransactionEntry   = true;                                         // 是否需要过滤空的事务头/尾
    protected long                 emptyTransactionInterval      = 5 * 1000;                                     // 空的事务输出的频率
    protected long                 emptyTransctionThresold       = 8192;                                         // 超过8192个事务头,输出一个

    protected volatile long        lastTransactionTimestamp      = 0L;
    protected AtomicLong           lastTransactionCount          = new AtomicLong(0L);
    protected volatile long        lastEmptyTransactionTimestamp = 0L;
    protected AtomicLong           lastEmptyTransactionCount     = new AtomicLong(0L);
    protected AtomicLong           eventsSinkBlockingTime        = new AtomicLong(0L);
    protected boolean              raw;

    public EntryEventSink(){
        addHandler(new HeartBeatEntryEventHandler());
    }

    public void start() {
        super.start();
        Assert.notNull(eventStore);

        if (eventStore instanceof MemoryEventStoreWithBuffer) {
            this.raw = ((MemoryEventStoreWithBuffer) eventStore).isRaw();
        }

        for (CanalEventDownStreamHandler handler : getHandlers()) {
            if (!handler.isStart()) {
                handler.start();
            }
        }
    }

    public void stop() {
        super.stop();

        for (CanalEventDownStreamHandler handler : getHandlers()) {
            if (handler.isStart()) {
                handler.stop();
            }
        }
    }

    public boolean sink(List<CanalEntry.Entry> entrys, InetSocketAddress remoteAddress, String destination) {
        return sinkData(entrys, remoteAddress);
    }

    //......
}
  • EntryEventSink继承了AbstractCanalEventSink,声明实现了CanalEventSink,其start方法会遍历handlers,挨个执行handler.start方法;其stop方法会遍历handlers,挨个执行handler.stop方法;其sink方法执行的是sinkData方法

sinkData

canal-1.1.4/sink/src/main/java/com/alibaba/otter/canal/sink/entry/EntryEventSink.java

public class EntryEventSink extends AbstractCanalEventSink<List<CanalEntry.Entry>> implements CanalEventSink<List<CanalEntry.Entry>> {

    //......

    private boolean sinkData(List<CanalEntry.Entry> entrys, InetSocketAddress remoteAddress)
                                                                                            throws InterruptedException {
        boolean hasRowData = false;
        boolean hasHeartBeat = false;
        List<Event> events = new ArrayList<Event>();
        for (CanalEntry.Entry entry : entrys) {
            if (!doFilter(entry)) {
                continue;
            }

            if (filterTransactionEntry
                && (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND)) {
                long currentTimestamp = entry.getHeader().getExecuteTime();
                // 基于一定的策略控制,放过空的事务头和尾,便于及时更新数据库位点,表明工作正常
                if (lastTransactionCount.incrementAndGet() <= emptyTransctionThresold
                    && Math.abs(currentTimestamp - lastTransactionTimestamp) <= emptyTransactionInterval) {
                    continue;
                } else {
                    lastTransactionCount.set(0L);
                    lastTransactionTimestamp = currentTimestamp;
                }
            }

            hasRowData |= (entry.getEntryType() == EntryType.ROWDATA);
            hasHeartBeat |= (entry.getEntryType() == EntryType.HEARTBEAT);
            Event event = new Event(new LogIdentity(remoteAddress, -1L), entry, raw);
            events.add(event);
        }

        if (hasRowData || hasHeartBeat) {
            // 存在row记录 或者 存在heartbeat记录,直接跳给后续处理
            return doSink(events);
        } else {
            // 需要过滤的数据
            if (filterEmtryTransactionEntry && !CollectionUtils.isEmpty(events)) {
                long currentTimestamp = events.get(0).getExecuteTime();
                // 基于一定的策略控制,放过空的事务头和尾,便于及时更新数据库位点,表明工作正常
                if (Math.abs(currentTimestamp - lastEmptyTransactionTimestamp) > emptyTransactionInterval
                    || lastEmptyTransactionCount.incrementAndGet() > emptyTransctionThresold) {
                    lastEmptyTransactionCount.set(0L);
                    lastEmptyTransactionTimestamp = currentTimestamp;
                    return doSink(events);
                }
            }

            // 直接返回true,忽略空的事务头和尾
            return true;
        }
    }

    protected boolean doSink(List<Event> events) {
        for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) {
            events = handler.before(events);
        }
        long blockingStart = 0L;
        int fullTimes = 0;
        do {
            if (eventStore.tryPut(events)) {
                if (fullTimes > 0) {
                    eventsSinkBlockingTime.addAndGet(System.nanoTime() - blockingStart);
                }
                for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) {
                    events = handler.after(events);
                }
                return true;
            } else {
                if (fullTimes == 0) {
                    blockingStart = System.nanoTime();
                }
                applyWait(++fullTimes);
                if (fullTimes % 100 == 0) {
                    long nextStart = System.nanoTime();
                    eventsSinkBlockingTime.addAndGet(nextStart - blockingStart);
                    blockingStart = nextStart;
                }
            }

            for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) {
                events = handler.retry(events);
            }

        } while (running && !Thread.interrupted());
        return false;
    }

    //......

}
  • sinkData方法遍历entrys,通过doFilter过滤掉一些entry,之后将entry转换为Event添加到events中,之后执行doSink方法;doSink方法遍历handlers,挨个执行handler.before(events),之后执行eventStore.tryPut(events),然后遍历handlers,挨个执行handler.after(events);若tryPut不成功,则遍历handlers,挨个执行handler.retry(events)

小结

CanalEventSink继承了CanalLifeCycle,它定义了sink、interrupt接口;AbstractCanalEventSink继承了AbstractCanalLifeCycle,声明实现了CanalEventSink接口;它定义了filter及handlers两个属性;EntryEventSink继承了AbstractCanalEventSink,声明实现了CanalEventSink,其start方法会遍历handlers,挨个执行handler.start方法;其stop方法会遍历handlers,挨个执行handler.stop方法;其sink方法执行的是sinkData方法

doc

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
美凌格栋栋酱 美凌格栋栋酱
6个月前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
Wesley13 Wesley13
3年前
java8 List集合的排序,求和,取最大值,按照条件过滤
public class Java8Test{public static void main(Stringargs){Personp1 new Person("麻子", 31);Personp2 new Person("
Wesley13 Wesley13
3年前
MySQL如何实时同步数据到ES?试试这款阿里开源的神器
摘要mall项目中的商品搜索功能,一直都没有做实时数据同步。最近发现阿里巴巴开源的canal可以把MySQL中的数据实时同步到Elasticsearch中,能很好地解决数据同步问题。今天我们来讲讲canal的使用,希望对大家有所帮助!canal简介canal主要用途是对MySQL数据库增量日志进行解析,提供增量数据的订阅和消
Stella981 Stella981
3年前
Canal简介及配置说明
1.简介canal是纯Java开发的,基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了mysql。原理相对比较简单:1.1.canal模拟mysqlslave的交互协议,伪装自己为mysqlslave,向mysqlmaster发送dump协议2.2.mysqlmaster收到dump请
Stella981 Stella981
3年前
Canal & Otter 的一些注意事项和最佳实践
1,canal和otter由于是java开发的,运行在windows和linux上都可以2,为了使用otter必须要canal的支持,otter作为canal的消费方,当然也可以单独使用canal,如果你有消费mysqlbinlog的需求3,canal有几种运行方式,生产环境中推荐使用zookeeper的持久化方式,对应的spring配置文件为:d
Wesley13 Wesley13
3年前
Canal+Otter
数据库同步中间件CanalOtter前日篇(2)MySQLInnoDB架构体系!这里写图片描述(https://static.oschina.net/uploads/img/201712/13102527_0Qct.jpg)MySQL体系前
Wesley13 Wesley13
3年前
Canal+Otter
数据库同步中间件CanalOtter前日篇(2)MySQLInnoDB架构体系!这里写图片描述(http://static.oschina.net/uploads/img/201604/24113828_GCwY.jpg)MySQL体系前端
Stella981 Stella981
3年前
Flink的sink实战之二:kafka
欢迎访问我的GitHubhttps://github.com/zq2599/blog\_demos内容:所有原创文章分类汇总及配套源码,涉及Java、Docker、Kubernetes、DevOPS等;本篇概览本文是《Flink的sink实战》系列的第二篇,前文《Flink的sink实战之一:初探》对sink有了基本的了解,本
Stella981 Stella981
3年前
Canal 组件简介与 vivo 帐号实践
互联网应用随着业务的发展,部分单表数据体量越来越大,应对服务性能与稳定的考虑,有做分库分表、数据迁移的需要,本文介绍了vivo帐号应对以上需求的实践。一、前言Canal是阿里巴巴开源项目,关于什么是Canal?又能做什么?我会在后文为大家一一介绍。在本文您将可以了解到vivo帐号使用Canal解决了什么样的业务痛点,基于此希望
Stella981 Stella981
3年前
Canal使用报错解决办法
1、 \destinationtest\_cancal,address/127.0.0.1:3306,EventParser\WARNc.a.o.s.a.i.setl.zookeeper.termin.WarningTerminProcessnid:1\1:canal:test\_cancal:java.lang.Null