聊聊BinaryLogClient的EventListener

logicflow
• 阅读 1707

本文主要研究一下BinaryLogClient的EventListener

EventListener

mysql-binlog-connector-java-0.20.1/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java

    public interface EventListener {

        void onEvent(Event event);
    }
  • EventListener接口定义了onEvent方法

BinaryLogClientStatistics

mysql-binlog-connector-java-0.20.1/src/main/java/com/github/shyiko/mysql/binlog/jmx/BinaryLogClientStatistics.java

public class BinaryLogClientStatistics implements BinaryLogClientStatisticsMXBean,
        BinaryLogClient.EventListener, BinaryLogClient.LifecycleListener {

    private AtomicReference<EventHeader> lastEventHeader = new AtomicReference<EventHeader>();
    private AtomicLong timestampOfLastEvent = new AtomicLong();
    private AtomicLong totalNumberOfEventsSeen = new AtomicLong();
    private AtomicLong totalBytesReceived = new AtomicLong();
    private AtomicLong numberOfSkippedEvents = new AtomicLong();
    private AtomicLong numberOfDisconnects = new AtomicLong();

    public BinaryLogClientStatistics() {
    }

    public BinaryLogClientStatistics(BinaryLogClient binaryLogClient) {
        binaryLogClient.registerEventListener(this);
        binaryLogClient.registerLifecycleListener(this);
    }

    @Override
    public String getLastEvent() {
        EventHeader eventHeader = lastEventHeader.get();
        return eventHeader == null ? null : eventHeader.getEventType() + "/" + eventHeader.getTimestamp() +
                " from server " + eventHeader.getServerId();
    }

    @Override
    public long getSecondsSinceLastEvent() {
        long timestamp = timestampOfLastEvent.get();
        return timestamp == 0 ? 0 : (getCurrentTimeMillis() - timestamp) / 1000;
    }

    @Override
    public long getSecondsBehindMaster() {
        // because lastEventHeader and timestampOfLastEvent are not guarded by the common lock
        // we may get some "distorted" results, though shouldn't be a problem given the nature of the final value
        long timestamp = timestampOfLastEvent.get();
        EventHeader eventHeader = lastEventHeader.get();
        if (timestamp == 0 || eventHeader == null) {
            return -1;
        }
        return (timestamp - eventHeader.getTimestamp()) / 1000;
    }

    @Override
    public long getTotalNumberOfEventsSeen() {
        return totalNumberOfEventsSeen.get();
    }

    @Override
    public long getTotalBytesReceived() {
        return totalBytesReceived.get();
    }

    @Override
    public long getNumberOfSkippedEvents() {
        return numberOfSkippedEvents.get();
    }

    @Override
    public long getNumberOfDisconnects() {
        return numberOfDisconnects.get();
    }

    @Override
    public void reset() {
        lastEventHeader.set(null);
        timestampOfLastEvent.set(0);
        totalNumberOfEventsSeen.set(0);
        totalBytesReceived.set(0);
        numberOfSkippedEvents.set(0);
        numberOfDisconnects.set(0);
    }

    @Override
    public void onEvent(Event event) {
        EventHeader header = event.getHeader();
        lastEventHeader.set(header);
        timestampOfLastEvent.set(getCurrentTimeMillis());
        totalNumberOfEventsSeen.getAndIncrement();
        totalBytesReceived.getAndAdd(header.getHeaderLength() + header.getDataLength());
    }

    @Override
    public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) {
        numberOfSkippedEvents.getAndIncrement();
        lastEventHeader.set(null);
        timestampOfLastEvent.set(getCurrentTimeMillis());
        totalNumberOfEventsSeen.getAndIncrement();
    }

    @Override
    public void onDisconnect(BinaryLogClient client) {
        numberOfDisconnects.getAndIncrement();
    }

    @Override
    public void onConnect(BinaryLogClient client) {
    }

    @Override
    public void onCommunicationFailure(BinaryLogClient client, Exception ex) {
    }

    protected long getCurrentTimeMillis() {
        return System.currentTimeMillis();
    }

}
  • BinaryLogClientStatistics实现了BinaryLogClient.EventListener接口,其onEvent方法会更新lastEventHeader、timestampOfLastEvent、totalNumberOfEventsSeen、totalBytesReceived

listenForEventPackets

mysql-binlog-connector-java-0.20.1/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java

public class BinaryLogClient implements BinaryLogClientMXBean {
    
    //......

    private void listenForEventPackets() throws IOException {
        ByteArrayInputStream inputStream = channel.getInputStream();
        boolean completeShutdown = false;
        try {
            while (inputStream.peek() != -1) {
                int packetLength = inputStream.readInteger(3);
                inputStream.skip(1); // 1 byte for sequence
                int marker = inputStream.read();
                if (marker == 0xFF) {
                    ErrorPacket errorPacket = new ErrorPacket(inputStream.read(packetLength - 1));
                    throw new ServerException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(),
                        errorPacket.getSqlState());
                }
                if (marker == 0xFE && !blocking) {
                    completeShutdown = true;
                    break;
                }
                Event event;
                try {
                    event = eventDeserializer.nextEvent(packetLength == MAX_PACKET_LENGTH ?
                        new ByteArrayInputStream(readPacketSplitInChunks(inputStream, packetLength - 1)) :
                        inputStream);
                    if (event == null) {
                        throw new EOFException();
                    }
                } catch (Exception e) {
                    Throwable cause = e instanceof EventDataDeserializationException ? e.getCause() : e;
                    if (cause instanceof EOFException || cause instanceof SocketException) {
                        throw e;
                    }
                    if (isConnected()) {
                        for (LifecycleListener lifecycleListener : lifecycleListeners) {
                            lifecycleListener.onEventDeserializationFailure(this, e);
                        }
                    }
                    continue;
                }
                if (isConnected()) {
                    eventLastSeen = System.currentTimeMillis();
                    updateGtidSet(event);
                    notifyEventListeners(event);
                    updateClientBinlogFilenameAndPosition(event);
                }
            }
        } catch (Exception e) {
            if (isConnected()) {
                for (LifecycleListener lifecycleListener : lifecycleListeners) {
                    lifecycleListener.onCommunicationFailure(this, e);
                }
            }
        } finally {
            if (isConnected()) {
                if (completeShutdown) {
                    disconnect(); // initiate complete shutdown sequence (which includes keep alive thread)
                } else {
                    disconnectChannel();
                }
            }
        }
    }

    private void notifyEventListeners(Event event) {
        if (event.getData() instanceof EventDataWrapper) {
            event = new Event(event.getHeader(), ((EventDataWrapper) event.getData()).getExternal());
        }
        for (EventListener eventListener : eventListeners) {
            try {
                eventListener.onEvent(event);
            } catch (Exception e) {
                if (logger.isLoggable(Level.WARNING)) {
                    logger.log(Level.WARNING, eventListener + " choked on " + event, e);
                }
            }
        }
    }

    //......

}
  • listenForEventPackets方法会读取channel.getInputStream(),然后通过eventDeserializer.nextEvent解析为event,之后调用updateGtidSet、notifyEventListeners、updateClientBinlogFilenameAndPosition方法;notifyEventListeners方法会遍历eventListeners挨个执行其onEvent方法

小结

EventListener接口定义了onEvent方法;BinaryLogClientStatistics实现了BinaryLogClient.EventListener接口,其onEvent方法会更新lastEventHeader、timestampOfLastEvent、totalNumberOfEventsSeen、totalBytesReceived

doc

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
美凌格栋栋酱 美凌格栋栋酱
6个月前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
双十一预售活动分析
2022年双十一促销活动已经开始,大家应该都提前开始关注今年双十一活动的时间表了吧?2022年10月24日晚8:00天猫双11预售时间,第一波销售时间10月31日晚8:0,第二波销售时间11月10日晚8:00;天猫双11的优惠力度是跨店每满30050
科工人 科工人
4年前
聊聊golang的DDD项目结构
序本文主要研究一下golang的DDD项目结构interfacesfoodappserver/interfacesinterfacesgit:(master)tree.|____fileupload||____fileformat.go||____fileupload.go|____food_handler.go|__
Wesley13 Wesley13
3年前
java8 List集合的排序,求和,取最大值,按照条件过滤
public class Java8Test{public static void main(Stringargs){Personp1 new Person("麻子", 31);Personp2 new Person("
Wesley13 Wesley13
3年前
jmxtrans+influxdb+grafana监控zookeeper实战
序本文主要研究一下如何使用jmxtransinfluxdbgranfa监控zookeeper配置zookeeperjmx在conf目录下新增zookeeperenv.sh,并使用chmodx赋予执行权限,内容如下JMXLOCALONLYfalseJMXDISABLEfals
Wesley13 Wesley13
3年前
redis的HyperLogLog实战
序本文主要研究一下redis的HyperLogLog的用场相关命令pfadd每添加一个元素的复杂度为O(1)127.0.0.1:6379pfadduv0907uid1uid2uid3(integer)1添加元素到HyperLogLog中,如果内部有变动返回1,没有
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Stella981 Stella981
3年前
RedisTemplate读取slowlog
序本文主要研究一下如何使用RedisTemplate(lettuce类库)读取slowlogmaven<dependency<groupIdorg.springframework.boot</groupId<artifactIdspringbootstarterdata
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Linux日志安全分析技巧
0x00前言我正在整理一个项目,收集和汇总了一些应急响应案例(不断更新中)。GitHub地址:https://github.com/Bypass007/EmergencyResponseNotes本文主要介绍Linux日志分析的技巧,更多详细信息请访问Github地址,欢迎Star。0x01日志简介Lin
logicflow
logicflow
Lv1
君家何处住,妾住在横塘。
文章
4
粉丝
0
获赞
0