ClickHouse最佳实战之分布表写入流程分析

Stella981
• 阅读 668

ClickHouse最佳实战之分布表写入流程分析

云妹导读:

前不久,京东智联云正式上线了基于Clickhouse的分析型云数据库JCHDB,一经推出便受到广大用户的极大关注。有兴趣的小伙伴可以回顾上一篇文章**《比MySQL快839倍!揭开分析型数据库JCHDB的神秘面纱》**。

ClickHouse像ElasticSearch一样具有数据分片(shard)的概念,这也是分布式存储的特点之一,即通过并行读写提高效率。ClickHouse依靠Distributed引擎实现了Distributed(分布式)表机制,在所有分片(本地表)上建立视图进行分布式查询,使用很方便。

ClickHouse最佳实战之分布表写入流程分析

Distributed表引擎是**一种特殊的表引擎,自身不会存储任何数据,而是通过读取或写入其他远端节点上的表进行数据处理的表引擎。**该表引擎需要依赖各个节点的本地表来创建,本地表的存在是Distributed表创建的依赖条件,创建语句如下:

CREATE TABLE {teble} ON CLUSTER {cluster}
AS {local_table}
ENGINE= Distributed({cluster}, {database}, {local_table},{policy})

这里的policy一般可以使用随机(例如rand())或哈希(例如halfMD5hash(id))。

再来看下ClickHouse集群节点配置文件,相关参数如下:

<remote_servers>
    <logs>
        <shard>
            <weight>1</weight>
            <internal_replication>true</internal_replication>
            <replica>
                <priority>1</priority>
                <host>example01-01-1</host>
                <port>9000</port>
            </replica>
            <replica>
                <host>example01-01-2</host>
                <port>9000</port>
            </replica>
        </shard>
        <shard>
            <weight>2</weight>
            <internal_replication>true</internal_replication>
            <replica>
                <host>example01-02-1</host>
                <port>9000</port>
            </replica>
            <replica>
                <host>example01-02-2</host>
                <port>9000</port>
            </replica>
        </shard>
    </logs>
</remote_servers>

ClickHouse最佳实战之分布表写入流程分析

有了上面的基础了解,就将进入主题了,本文主要是对Distributed表如何写入及如何分发做一下分析,略过SQL的词法解析、语法解析等步骤,从写入流开始,其构造方法如下:

DistributedBlockOutputStream(const Context & context_, StorageDistributed &
storage_, const ASTPtr & query_ast_, const ClusterPtr & cluster_, bool
insert_sync_, UInt64 insert_timeout_);

如果insert_sync_为true,表示是同步写入,并配合insert_timeout_参数使用(insert_timeout_为零表示没有超时时间);如果insert_sync_为false,表示写入是异步。

1,同步写入还是异步写入

同步写入是指数据直写入实际的表中,而异步写入是指数据首先被写入本地文件系统,然后发送到远端节点。

BlockOutputStreamPtr StorageDistributed::write(const ASTPtr &, const Context &
context)
{
   ......

   /// Force sync insertion if it is remote() table function
   bool insert_sync = settings.insert_distributed_sync || owned_cluster;
   auto timeout = settings.insert_distributed_timeout;
   /// DistributedBlockOutputStream will not own cluster, but will own 
ConnectionPools of the cluster
   return std::make_shared(
       context, *this, createInsertToRemoteTableQuery(remote_database,
remote_table, getSampleBlockNonMaterialized()), cluster,
       nsert_sync, timeout);
}

是否执行同步写入是由insert_sync决定的,最终是由是否配置insert_distributed_sync(默认为false)和owned_cluster值的或关系决定的,一般在使用MergeTree之类的普通表引擎时,通常是异步写入,但在使用表函数时(使用owned_cluster来判断是否是表函数),通常会使用同步写入。这也是在设计业务逻辑时需要注意的。

owned_cluster是什么时候赋值的呢?

StoragePtr TableFunctionRemoteexecuteImpl(const ASTPtr & astfunction, const Context & 
context, const stdstring & tablename) const
{ 
 ......
 StoragePtr res = remotetablefunction_ptr
     ? StorageDistributed::createWithOwnCluster(
       table_name,
       structureremotetable,
       remotetablefunction_ptr,
       cluster,
       context)
     : StorageDistributed::createWithOwnCluster(
       table_name,
       structureremotetable,
       remote_database,
       remote_table,
       cluster,
       context);
 ......
}  
StoragePtr StorageDistributed::createWithOwnCluster(
  const std::string & tablename, 
  const ColumnsDescription & columns_,
  ASTPtr & remotetablefunctionptr, 
  ClusterPtr & ownedcluster, 
  const Context & context_)
{ 
  auto res = create(String{}, tablename, columns, ConstraintsDescription{}, 
remotetablefunctionptr, String{}, context, ASTPtr(), String(), false);
  res->ownedcluster = ownedcluster_;
  return res;
}

可以发现在创建remote表时会根据remote_table_function_ptr参数对最终的owned_cluster_赋值为true。

2,异步写入是如何实现的

了解了什么时候使用同步写入什么时候异步写入后,再继续分析正式的写入过程,同步写入一般场景中涉及较少,这里主要对异步写入逻辑进行分析。outStream的write方法主逻辑如下:

DistributedBlockOutputStream::write()
                 ↓
            if insert_sync
             |         |
           true      false
             ↓         ↓
      writeSync()   writeAsync()   

其实这个write方法是重写了virtual void IBlockOutputStream::write(const Block & block),所以节点在接收到流并调用流的write方法就会进入该逻辑中。并且根据insert_sync来决定走同步写还是异步写。

3,写入本地节点还是远端节点

主要还是对异步写入进行分析,其实writeAsync()最终的实现方法是writeAsyncImpl(),大致逻辑图如下:

         writeAsyncImpl()
               ↓
 if shard_info.hasInternalReplication()
    |                          |
   true                       false
    ↓                          ↓
writeToLocal()             writeToLocal()
    ↓                          ↓
writeToShard()        for(every shard){writeToShard()}
    ↓                          ↓ 
   end                        end

其中getShardsInfo()方法就是获取config.xml配置文件中获取集群节点信息,hasInternalReplication()就对应着配置文件中的internal_replication参数,如果为true,就会进入最外层的if逻辑,否则就会进入else逻辑。

其中writeToLocal()方法是相同的,是指如果shard包含本地节点,优先选择本地节点进行写入;后半部分writeToShard()就是根据internal_replication参数的取值来决定是写入其中一个远端节点,还是所有远端节点都写一次。

4,数据如何写入本地节点

当然一般情况Distributed表还是基于ReplicatedMergeTree系列表进行创建,而不是基于表函数的,所以大多数场景还是会先写入本地再分发到远端节点。那写入Distributed表的数据是如何保证原子性落盘而不会在数据正在写入的过程中就把不完整的数据发送给远端其他节点呢?看下writeToShard()方法大致逻辑,如下:

     writeToShard()
          ↓
for(every dir_names){
          |
          └──if first iteration
                 |       |
               false     true
                 ↓       ↓ 
                 |       ├──storage.requireDirectoryMonitor()
                 |       ├──CompressedWriteBuffer
                 |       ├──writeStringBinary()
                 |       ├──stream.writePrefix()
                 |       ├──stream.write(block)
                 |       ├──stream.writeSuffix()
                 ↘     ↙ 
             link(tmp_file, file) 
                    └──}     

继续具体再看下源码的具体实现,如下:

void DistributedBlockOutputStream::writeToShard(const Block & block, const
std::vector<std::string> & dir_names) 
{
   /** tmp directory is used to ensure atomicity of transactions
     * and keep monitor thread out from reading incomplete data
     */
   std::string first_file_tmp_path{};

   auto first = true;

   /// write first file, hardlink the others
   for (const auto & dir_name : dir_names)
   {
       const auto & path = storage.getPath() + dir_name + '/';

       /// ensure shard subdirectory creation and notify storage
       if (Poco::File(path).createDirectory())
           storage.requireDirectoryMonitor(dir_name);

       const auto & file_name = toString(storage.file_names_increment.get()) +
".bin";
       const auto & block_file_path = path + file_name;

       /** on first iteration write block to a temporary directory for 
subsequent hardlinking to ensure
           * the inode is not freed until we're done */
       if (first)
       {
           first = false;

           const auto & tmp_path = path + "tmp/";
           Poco::File(tmp_path).createDirectory();
           const auto & block_file_tmp_path = tmp_path + file_name;

           first_file_tmp_path = block_file_tmp_path;

           WriteBufferFromFile out{block_file_tmp_path};
           CompressedWriteBuffer compress{out};
           NativeBlockOutputStream stream{compress, ClickHouseRevision::get(),
block.cloneEmpty()};

           writeVarUInt(UInt64(DBMS_DISTRIBUTED_SENDS_MAGIC_NUMBER), out);
           context.getSettingsRef().serialize(out);
           writeStringBinary(query_string, out);

          stream.writePrefix();
          stream.write(block);
          stream.writeSuffix();
       }

       if (link(first_file_tmp_path.data(), block_file_path.data()))
           throwFromErrnoWithPath("Could not link " + block_file_path + " to "
+ first_file_tmp_path, block_file_path,
                  ErrorCodes::CANNOT_LINK);
   }
       ......
}

首先来了解下Distributed表在目录中的存储方式,默认位置都是/var/lib/clickhouse/data/{database}/{table}/在该目录下会为每个shard生成不同的目录,其中存放需要发送给该shard的数据文件,例如:

[root@ck test]# tree
.
├── 'default@ck2-0:9000,default@ck2-1:9000'
│   ├── 25.bin
│   └── tmp
│   └── 26.bin
└── 'default@ck3-0:9000,default@ck3-1:9000'
└── tmp 

可以发现每个shard对应的目录名是{darabse}@{hostname}:{tcpPort}的格式,如果多个副本会用,分隔。并且每个shard目录中还有个tmp目录,这个目录的设计在writeToShard()方法中做了解释,是为了避免数据文件在没写完就被发送到远端。

数据文件在本地写入的过程中会先写入tmp路径中,写完后通过硬链接link到shard目录,保证只要在shard目录中出现的数据文件都是完整写入的数据文件。

数据文件的命名是通过全局递增的数字加.bin命名,是为了在后续分发到远端节点保持顺序性。

5,数据如何分发到各个节点

细心的你可能已经发现在writeToShard()方法中有个requireDirectoryMonitor(),这个方法就是将shard目录注册监听,并通过专用类StorageDistributedDirectoryMonitor来实现数据文件的分发,根据不同配置可以实现逐一分发或批量分发。并且包含对坏文件的容错处理。

ClickHouse最佳实战之分布表写入流程分析

分析到这,可能还有人会觉得云里雾里,觉得整个流程串不起来,其实这样写是为了先不影响Distributed表写入的主流程,明白了这个再附加上sharding_key拆分和权重拆分就很好理解了。

ClickHouse最佳实战之分布表写入流程分析

上面提到过writeAsync()的最终实现方法是writeAsyncImpl,这个说法是没问题的,但是中间还有段关键逻辑,如下:

                      writeAsync()
                           ↓
if storage.getShardingKeyExpr() && (cluster->getShardsInfo().size() > 1
               |                       |
             true                     false
              ↓                        ↓
      writeAsyncImpl(block)      writeSplitAsync(block)
                                        ↓
                                   splitBlock(block)
                                        ↓
                        writeAsyncImpl(splitted_blocks,shard_idx)

getShardingKeyExpr()方法就是去获取sharding_key生成的表达式指针,该表达式是在创建表时就生成的,如下:

sharding_key_expr = buildShardingKeyExpression(sharding_key_, global_context,
getColumns().getAllPhysical(), false);

那sharding_key和sharding_key_expr是什么关系呢?如下:

const ExpressionActionsPtr & getShardingKeyExpr() const { return 
sharding_key_expr; }

所以说sharding_key_expr最终主要就是由sharding_key决定的。

一般情况下getShardingKeyExpr()方法都为true,如果再满足shard数量大于1,就会对block进行拆分,由splitBlock()方法主要逻辑就是创建selector并使用selector进行切割,大致逻辑如下:

             splitBlock()
                  ↓
           createSelector(block)
                  ↓
for(every shard){column->scatter(num_shards, selector);}

对于如何创建selector以及selector中都做了什么事儿,来具体看下源码截取,如下:

IColumn::Selector DistributedBlockOutputStream::createSelector(const Block &
source_block)
{
    Block current_block_with_sharding_key_expr = source_block;
    storage.getShardingKeyExpr()- 
>execute(current_block_with_sharding_key_expr);

    const auto & key_column =
current_block_with_sharding_key_expr.getByName(storage.getShardingKeyColumnName
());
    const auto & slot_to_shard = cluster->getSlotToShard();
    ......
   throw Exception{"Sharding key expression does not evaluate to an integer 
type", ErrorCodes::TYPE_MISMATCH};
}

ClickHouse最佳实战之分布表写入流程分析

看splitBlock()方法,ClickHouse是利用createSelector()方法构造selector来进行后续的处理。在createSelector()方法中最重要的就是key_column和slot_to_shard。

key_column是通过sharding_key间接获得的,是为了根据主键列进行切割;slot_to_shard是shard插槽,这里就是为了处理权重,在后续向插槽中插入数据时就会结合config.xml中的weight进行按比例处理。

细节比较复杂这里不做太细致的分析,有兴趣可以自行看下(如template IColumn::Selector createBlockSelector())。

到此,对于Distributed表的写入流程的关键点就大致分析完了。篇幅有限有些细节没有做过多说明,有兴趣的可以自行再了解下。

ClickHouse最佳实战之分布表写入流程分析

通过对Distributed表写入流程的分析,了解了该类型表的实际工作原理,所以在实际应用中有几个点还需要关注一下:

  1. Distributed表在写入时会在本地节点生成临时数据,会产生写放大,所以会对CPU及内存造成一些额外消耗,建议尽量少使用Distributed表进行写操作;
  2. Distributed表写的临时block会把原始block根据sharding_key和weight进行再次拆分,会产生更多的block分发到远端节点,也增加了merge的负担;
  3. Distributed表如果是基于表函数创建的,一般是同步写,需要注意。

了解原理才能更好的使用,遇到问题才能更好的优化。

点击【**阅读原文**】即可前往京东智联云控制台开通试用JCHDB。

点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Jacquelyn38 Jacquelyn38
2年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Peter20 Peter20
3年前
mysql中like用法
like的通配符有两种%(百分号):代表零个、一个或者多个字符。\(下划线):代表一个数字或者字符。1\.name以"李"开头wherenamelike'李%'2\.name中包含"云",“云”可以在任何位置wherenamelike'%云%'3\.第二个和第三个字符是0的值wheresalarylike'\00%'4\
Easter79 Easter79
2年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
2年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
2年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
2年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
3个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这