聊聊debezium的SimpleSourceConnector

智能合
• 阅读 1173

本文主要研究一下debezium的SimpleSourceConnector

SimpleSourceConnector

debezium-v1.1.1.Final/debezium-embedded/src/main/java/io/debezium/connector/simple/SimpleSourceConnector.java

public class SimpleSourceConnector extends SourceConnector {

    protected static final String VERSION = "1.0";

    public static final String TOPIC_NAME = "topic.name";
    public static final String RECORD_COUNT_PER_BATCH = "record.count.per.batch";
    public static final String BATCH_COUNT = "batch.count";
    public static final String DEFAULT_TOPIC_NAME = "simple.topic";
    public static final String INCLUDE_TIMESTAMP = "include.timestamp";
    public static final String RETRIABLE_ERROR_ON = "error.retriable.on";
    public static final int DEFAULT_RECORD_COUNT_PER_BATCH = 1;
    public static final int DEFAULT_BATCH_COUNT = 10;
    public static final boolean DEFAULT_INCLUDE_TIMESTAMP = false;

    private Map<String, String> config;

    public SimpleSourceConnector() {
    }

    @Override
    public String version() {
        return VERSION;
    }

    @Override
    public void start(Map<String, String> props) {
        config = props;
    }

    @Override
    public Class<? extends Task> taskClass() {
        return SimpleSourceConnector.SimpleConnectorTask.class;
    }

    @Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        List<Map<String, String>> configs = new ArrayList<>();
        configs.add(config);
        return configs;
    }

    @Override
    public void stop() {
        // do nothing
    }

    @Override
    public ConfigDef config() {
        return null;
    }

    //......

}
  • SimpleSourceConnector继承了kafka的SourceConnector,其taskClass返回的是SimpleSourceConnector.SimpleConnectorTask.class

SimpleConnectorTask

debezium-v1.1.1.Final/debezium-embedded/src/main/java/io/debezium/connector/simple/SimpleSourceConnector.java

    public static class SimpleConnectorTask extends SourceTask {

        private int recordsPerBatch;
        private int errorOnRecord;
        private Queue<SourceRecord> records;
        private final AtomicBoolean running = new AtomicBoolean();
        private List<SourceRecord> retryRecords = null;

        @Override
        public String version() {
            return VERSION;
        }

        @Override
        public void start(Map<String, String> props) {
            if (running.compareAndSet(false, true)) {
                Configuration config = Configuration.from(props);
                recordsPerBatch = config.getInteger(RECORD_COUNT_PER_BATCH, DEFAULT_RECORD_COUNT_PER_BATCH);
                int batchCount = config.getInteger(BATCH_COUNT, DEFAULT_BATCH_COUNT);
                String topic = config.getString(TOPIC_NAME, DEFAULT_TOPIC_NAME);
                boolean includeTimestamp = config.getBoolean(INCLUDE_TIMESTAMP, DEFAULT_INCLUDE_TIMESTAMP);
                errorOnRecord = config.getInteger(RETRIABLE_ERROR_ON, -1);

                // Create the partition and schemas ...
                Map<String, ?> partition = Collect.hashMapOf("source", "simple");
                Schema keySchema = SchemaBuilder.struct()
                        .name("simple.key")
                        .field("id", Schema.INT32_SCHEMA)
                        .build();
                Schema valueSchema = SchemaBuilder.struct()
                        .name("simple.value")
                        .field("batch", Schema.INT32_SCHEMA)
                        .field("record", Schema.INT32_SCHEMA)
                        .field("timestamp", Schema.OPTIONAL_INT64_SCHEMA)
                        .build();

                // Read the offset ...
                Map<String, ?> lastOffset = context.offsetStorageReader().offset(partition);
                long lastId = lastOffset == null ? 0L : (Long) lastOffset.get("id");

                // Generate the records that we need ...
                records = new LinkedList<>();
                long initialTimestamp = System.currentTimeMillis();
                int id = 0;
                for (int batch = 0; batch != batchCount; ++batch) {
                    for (int recordNum = 0; recordNum != recordsPerBatch; ++recordNum) {
                        ++id;
                        if (id <= lastId) {
                            // We already produced this record, so skip it ...
                            continue;
                        }
                        if (!running.get()) {
                            // the task has been stopped ...
                            return;
                        }
                        // We've not seen this ID yet, so create a record ...
                        Map<String, ?> offset = Collect.hashMapOf("id", id);
                        Struct key = new Struct(keySchema);
                        key.put("id", id);
                        Struct value = new Struct(valueSchema);
                        value.put("batch", batch + 1);
                        value.put("record", recordNum + 1);
                        if (includeTimestamp) {
                            value.put("timestamp", initialTimestamp + id);
                        }
                        SourceRecord record = new SourceRecord(partition, offset, topic, 1, keySchema, key, valueSchema, value);
                        records.add(record);
                    }
                }
            }
        }

        @Override
        public List<SourceRecord> poll() throws InterruptedException {
            if (records.isEmpty()) {
                // block forever, as this thread will be interrupted if/when the task is stopped ...
                new CountDownLatch(1).await();
            }
            if (running.get()) {
                if (retryRecords != null) {
                    final List<SourceRecord> r = retryRecords;
                    retryRecords = null;
                    return r;
                }
                // Still running, so process whatever is in the queue ...
                List<SourceRecord> results = new ArrayList<>();
                int record = 0;
                while (record < recordsPerBatch && !records.isEmpty()) {
                    record++;
                    final SourceRecord fetchedRecord = records.poll();
                    final Integer id = ((Struct) (fetchedRecord.key())).getInt32("id");
                    results.add(fetchedRecord);
                    if (id == errorOnRecord) {
                        retryRecords = results;
                        throw new RetriableException("Error on record " + errorOnRecord);
                    }
                }
                return results;
            }
            // No longer running ...
            return null;
        }

        @Override
        public void stop() {
            // Request the task to stop and return immediately ...
            running.set(false);
        }
    }
  • SimpleConnectorTask继承了kafka的SourceTask,其start方法主要是根据batchCount来创建SourceRecord;其stop方法设置running为false;其poll方法主要是执行records.poll()

小结

SimpleSourceConnector继承了kafka的SourceConnector,其taskClass返回的是SimpleSourceConnector.SimpleConnectorTask.class;SimpleConnectorTask继承了kafka的SourceTask,其start方法主要是根据batchCount来创建SourceRecord;其stop方法设置running为false;其poll方法主要是执行records.poll()

doc

点赞
收藏
评论区
推荐文章
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
美凌格栋栋酱 美凌格栋栋酱
7个月前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
科工人 科工人
4年前
聊聊golang的DDD项目结构
序本文主要研究一下golang的DDD项目结构interfacesfoodappserver/interfacesinterfacesgit:(master)tree.|____fileupload||____fileformat.go||____fileupload.go|____food_handler.go|__
Stella981 Stella981
3年前
Debezium日常运维手机
关于KafkaConnect:(1)是否可以动态添加已有数据的新表? 不可以,KafkaConnect需要配置先行。如果是已有数据的新表,无法通过修改已有的kafkaconnect配置进行新表的Snapshot初始化。建议通过tablewhitelist功能,进行新表的snapshot。然后等到稳定后,再合并到同一个Kafkacon
Stella981 Stella981
3年前
Debezium接入Mysql遇到到的Tinyint坑
问题背景:在Debezium做数据初始化的时候,对于一些tinyint字段的值,出现0,1的值的异常。经过源码排查,数据在JDBC上面,读取到的数据是Boolean值。通过排查,原来是MYSQL特有的数据问题,需要在JDBC上面加上关键字,问题解决。JAVA数据类型和MYSQL的数据类型转换,要注意tinyInt类型,且存储长度为1的情
Stella981 Stella981
3年前
Debezium 处理 mysql timestamp 的坑
使用Debezium订阅mysqlbinlog Debezium对于Timestamp的处理,会变成字符串,处理的核心代码是:ZonedDateTimeexpectedTimestampZonedDateTime.of(LocalDateTime.parse("20140908T17:51:04.780"),
Wesley13 Wesley13
3年前
FLV文件格式
1.        FLV文件对齐方式FLV文件以大端对齐方式存放多字节整型。如存放数字无符号16位的数字300(0x012C),那么在FLV文件中存放的顺序是:|0x01|0x2C|。如果是无符号32位数字300(0x0000012C),那么在FLV文件中的存放顺序是:|0x00|0x00|0x00|0x01|0x2C。2.  
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Stella981 Stella981
3年前
Debezium 采坑2
文档中没有写的一个参数,snapshot.new.tables可以新增snapshot表publicstaticfinalFieldSNAPSHOT_NEW_TABLESField.create("snapshot.new.tables")
Stella981 Stella981
3年前
RedisTemplate读取slowlog
序本文主要研究一下如何使用RedisTemplate(lettuce类库)读取slowlogmaven<dependency<groupIdorg.springframework.boot</groupId<artifactIdspringbootstarterdata
智能合
智能合
Lv1
近城远山,都是人间。
文章
4
粉丝
0
获赞
0