「Elasticsearch」ES重建索引怎么才能做到数据无缝迁移呢?

DAO组织
• 阅读 4146

背景

众所周知,Elasticsearch是⼀个实时的分布式搜索引擎,为⽤户提供搜索服务。当我们决定存储某种数据,在创建索引的时候就需要将数据结构,即Mapping确定下来,于此同时索引的设定和很多固定配置将不能改变。
<!-- more -->
那如果后续业务发生变化,需要改变数据结构或者更换ES更换分词器怎么办呢?为此,Elastic团队提供了很多通过辅助⼯具来帮助开发⼈员进⾏重建索引的方案。
如果对 reindex API 不熟悉,那么在遇到重构的时候,必然事倍功半,效率低下。反之,就可以方便地进行索引重构,省时省力。

步骤

假设之前我们已经存在一个blog索引,因为更换分词器需要对该索引中的数据进行重建索引,以便支持业务使用新的分词规则搜索数据,并且尽可能使这个变化对外服务没有感知,大概分为以下几个步骤:​

  • 新增⼀个索引blog_lastest,Mapping数据结构与blog索引一致
  • blog数据同步至blog_lastest
  • 删除blog索引
  • 数据同步后给blog_lastest添加别名blog

新建索引

在这里推荐一个ES管理工具Kibana,主要针对数据的探索、可视化和分析。

「Elasticsearch」ES重建索引怎么才能做到数据无缝迁移呢?

put /blog_lastest/
{
    "mappings":{
        "properties":{
            "title":{
                "type":"text",
                "analyzer":"ik_max_word"
            },
            "author":{
                "type":"keyword",
                "fields":{
                    "seg":{
                        "type":"text",
                        "analyzer":"ik_max_word"
                    }
                }
            }
        }
    }
}

将旧索引数据copy到新索引

同步等待

接⼝将会在 reindex 结束后返回

POST /_reindex
{
    "source": {
        "index": "blog"
    },
    "dest": {
        "index": "blog_lastest"
    }
}

kibana 中的使用如下所示
「Elasticsearch」ES重建索引怎么才能做到数据无缝迁移呢?

当然高版本(7.1.1)中,ES都有提供对应的Java REST Client,比如

ReindexRequest reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices("blog").setSource.setDestIndex("blog_lastest");
TaskSubmissionResponse taskSubmissionResponse = client.submitReindexTask(reindexRequest, RequestOptions.DEFAULT);

为了防止赘述,接下来举例全部以kibana中请求介绍,如果有需要用Java REST Client,可以自行去ES官网查看。

异步执⾏

如果 reindex 时间过⻓,建议加上 wait_for_completion=false 的参数条件,这样 reindex 将直接返回 taskId

POST /_reindex?wait_for_completion=false
{
    "source": {
        "index": "blog"
    },
    "dest": {
        "index": "blog_lastest"
    }
}

返回:

{
  "task" : "dpBihNSMQfSlboMGlTgCBA:4728038"
}

op_type 参数

op_type 参数控制着写入数据的冲突处理方式,如果把 op_type 设置为 create【默认值】,在 _reindex API 中,表示写入时只在 dest index中添加不存在的 doucment,如果相同的 document 已经存在,则会报 version confilct 的错误,那么索引操作就会失败。【这种方式与使用 _create API 时效果一致】

POST _reindex
{
  "source": {
    "index": "blog"
  },
  "dest": {
    "index": "blog_lastest",
    "op_type": "create"
  }
}

如果这样设置了,也就不存在更新数据的场景了【冲突数据无法写入】,我们也可以把 op_type 设置为 index,表示所有的数据全部重新索引创建。

conflicts 配置

默认情况下,当发生 version conflict 的时候,_reindex 会被 abort,任务终止【此时数据还没有 reindex 完成】,在返回体中的 failures 指标中会包含冲突的数据【有时候数据会非常多】,除非把 conflicts 设置为 proceed

关于 abort 的说明,如果产生了 abort,已经执行的数据【例如更新写入的】仍然存在于目标索引,此时任务终止,还会有数据没有被执行,也就是漏数了。换句话说,该执行过程不会回滚,只会终止。如果设置了 proceed,任务在检测到数据冲突的情况下,不会终止,会跳过冲突数据继续执行,直到所有数据执行完成,此时不会漏掉正常的数据,只会漏掉有冲突的数据。

POST _reindex
{
  "source": {
    "index": "blog"
  },
  "dest": {
    "index": "blog_lastest",
    "op_type": "create"
  },
  "conflicts": "proceed"
}

我们可以故意把 op_type 设置为 create,人为制造数据冲突的场景,测试时更容易观察到冲突现象。

如果把 conflicts 设置为 proceed,在返回体结果中不会再出现 failures 的信息,但是通过 version_conflicts 指标可以看到具体的数量。

批次大小配置

当你发现reindex的速度有些慢的时候,可以在 query 参数的同一层次【即 source 参数中】添加 size 参数,表示 scroll size 的大小【会影响批次的次数,进而影响整体的速度】,如果不显式设置,默认是一批 1000 条数据,在一开始的简单示例中也看到了。
如下,设置 scroll size 为 5000:

POST /_reindex?wait_for_completion=false
{
    "source": {
        "index": "blog",
        "size":5000
    },
    "dest": {
        "index": "blog_lastest",
        "op_type": "create"
    },
    "conflicts": "proceed"
}

测试后,速度达到了 30 分钟 500 万左右,明显提升了很多。

根据taskId可以实时查看任务的执行状态

一般来说,如果我们的 source index 很大【比如几百万数据量】,则可能需要比较长的时间来完成 _reindex 的工作,可能需要几十分钟。而在此期间不可能一直等待结果返回,可以去做其它事情,如果中途需要查看进度,可以通过 _tasks API 进行查看。

GET /_tasks/{taskId}

返回:

{
  "completed" : false,
  "task" : {
    "node" : "dpBihNSMQfSlboMGlTgCBA",
    "id" : 4704218,
    "type" : "transport",
    "action" : "indices:data/write/reindex",
    ……
}

当执行完毕时,completed为true
查看任务进度以及取消任务,除了根据taskId查看以外,我们还可以通过查看所有的任务中筛选本次reindex的任务。

GET _tasks?detailed=true&actions=*reindex

返回结果:

{
  "nodes" : {
    "dpBihNSMQfSlboMGlTgCBA" : {
      "name" : "node-16111-9210",
      "transport_address" : "192.168.XXX.XXX:9310",
      "host" : "192.168.XXX.XXX",
      "ip" : "192.168.16.111:9310",
      "roles" : [
        "ingest",
        "master"
      ],
      "attributes" : {
        "xpack.installed" : "true",
        "transform.node" : "false"
      },
      "tasks" : {
        "dpBihNSMQfSlboMGlTgCBA:6629305" : {
          "node" : "dpBihNSMQfSlboMGlTgCBA",
          "id" : 6629305,
          "type" : "transport",
          "action" : "indices:data/write/reindex",
          "status" : {
            "total" : 8361421,
            "updated" : 0,
            "created" : 254006,
            "deleted" : 0,
            "batches" : 743,
            "version_conflicts" : 3455994,
            "noops" : 0,
            "retries" : {
              "bulk" : 0,
              "search" : 0
            },
            "throttled_millis" : 0,
            "requests_per_second" : -1.0,
            "throttled_until_millis" : 0
          },
          "description" : "reindex from [blog] to [blog_lastest][_doc]",
          "start_time_in_millis" : 1609338953464,
          "running_time_in_nanos" : 1276738396689,
          "cancellable" : true,
          "headers" : { }
        }
      }
    }
  }
}

注意观察里面的几个重要指标,例如从 description 中可以看到任务描述,从 tasks 中可以找到任务的 id【例如 dpBihNSMQfSlboMGlTgCBA:6629305】,从 cancellable 可以判断任务是否支持取消操作。
这个 API 其实就是模糊匹配,同理也可以查询其它类型的任务信息,例如使用 GET _tasks?detailed=true&actions=*byquery 查看查询请求的状态。
当集群的任务太多时我们就可以根据task_id,也就是上面提到GET /_tasks/task_id 方式更加准确地查询指定任务的状态,避免集群的任务过多,不方便查看。
如果遇到操作失误的场景,想取消任务,有没有办法呢?
当然有啦,虽然覆水难收,通过调用
_tasks API

POST _tasks/task_id/_cancel

这里的 task_id 就是通过上面的查询任务接口获取的任务id(任务要支持取消操作,即【cancellable 为 true】时方能收效)。

删除旧索引

当我们通过 API 查询发现任务完成后,就可以进行后续操作,我这里是要删除旧索引,然后再给新索引起别名,用于替换旧索引,这样才能保证对外服务没有任何感知。

DELETE /blog

使用别名

POST /_aliases
{
    "actions":[
        {
            "add":{
                "index":"blog_lastest",
                "alias":"blog"
            }
        }
    ]
}

通过别名访问新索引

进行过以上操作后,我们可以使用一个简单的搜索验证服务。

POST /blog/_search
{
    "query": {
        "match": {
            "author": "james"
        }
    }
}

如果搜索结果达到我们的预期目标,至此,数据索引重建迁移完成。

本文可转载,但需声明原文出处。 程序员小明,一个很少加班的程序员。欢迎关注微信公众号“程序员小明”,获取更多优质文章。
点赞
收藏
评论区
推荐文章
Irene181 Irene181
4年前
手把手教你使用Flask搭建ES搜索引擎(预备篇)
/1前言/Elasticsearch是一个开源的搜索引擎,建立在一个全文搜索引擎库ApacheLucene™基础之上。那么如何实现Elasticsearch和Python的对接成为我们所关心的问题了(怎么什么都要和Python关联啊)。/2 Python交互/所以,Python也就提供了可以对接Elasti
Wesley13 Wesley13
3年前
ES添加字段
背景Elasticsearch是schemaless的数据存储方案。可以任意的向索引中添加字段。在此需明确以下背景:1.ES新添加的字段只对新数据、新type起作用;原有已经索引的数据不会生效;2.为加快ES的检索和索引效率,构建索引时会指定其mapping结构;添加索引字段即修改mapping;3.目前我们采用两种索引方案
Stella981 Stella981
3年前
ElasticSearch(增put、删delete、改(本质是先删除后添加)post、查get、post)
一、ElasticSearch简介1.1什么是ElasticSearchElasticSearch,简称es,es是一个开源的高扩展的分布式全文搜索引擎,可以近乎实时的存储、检索数据;本身扩展性很好,可以扩展到上百台服务器,处理pb级别的数据。es也使用java开发并使用Lucene作为其核心来实现所有索
Stella981 Stella981
3年前
Elasticsearch与Solr优缺点比较
Elasticsearch简介Elasticsearch是一个实时的分布式搜索和分析引擎。它可以帮助你用前所未有的速度去处理大规模数据。它可以用于全文搜索,结构化搜索以及分析,也可以将这三者进行组合。Elasticsearch是一个建立在全文搜索引擎ApacheLucene™基础上的搜索引擎,可以说Lucen
Stella981 Stella981
3年前
Elasticsearch从入门到放弃:瞎说Mapping
前面我们聊了Elasticsearch的索引、搜索和分词器,今天再来聊另一个基础内容——Mapping。Mapping在Elasticsearch中的地位相当于关系型数据库中的schema,它可以用来定义索引中字段的名字、定义字段的数据类型,还可以用来做一些字段的配置。从Elasticsearch7.0开始,Mapping中不在乎需要
Stella981 Stella981
3年前
ElasticSearch学习汇总
什么是ElasticSearch?ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTfulweb接口。Elasticsearch是用Java语言开发的,并作为Apache许可条款下的开放源码发布,是一种流行的企业级搜索引擎。ElasticSearch能够做到实时搜索并且稳定,
Wesley13 Wesley13
3年前
ELK初探
EKL核心组成1.ElasticSearch开源分布式搜索引擎,他的特点是分布式、零配置、自动发现、索引自动分片,索引副本机制,restful接口,多数据源,自动搜索负载。安装ElasticSearch  高可用,易扩展,支持集群(cluster),分片和复制(sharding和replicas)验证启动:curlXGETht
Stella981 Stella981
3年前
ELK学习笔记之ElasticSearch的索引详解
0x00ElasticSearch的索引和MySQL的索引方式对比Elasticsearch是通过Lucene的倒排索引技术实现比关系型数据库更快的过滤。特别是它对多条件的过滤支持非常好,比如年龄在18和30之间,性别为女性这样的组合查询。倒排索引很多地方都有介绍,但是其比关系型
Stella981 Stella981
3年前
MQ异步同步搜索引擎ElasticSearch数据踩坑
业务背景  在大型网站中,为了减少DB压力、让数据更精准、速度更快,将读拆分出来采用搜索引擎来为DB分担读的压力,ElasticSearch就是目前市面上比较流行的搜索引擎,他的检索速度奇快、支持各种复杂的全文检索,在各种场景下对比其他的搜索引擎的检索速度都显得尤为出众。这篇就先不介绍ElasticSearch了,后续我会出一个ElasticS
京东云开发者 京东云开发者
11个月前
Elasticsearch Mapping类型修改
背景通常数据库进行分库分表后,目前比较常规的作法,是通过将数据异构到Elasticsearch来提供分页列表查询服务;在创建Elasticsearch索引时,基本都是会参考目前的业务需求、关系数据库中的类型以及对数据的相关规划来定义相关字段mapping的
京东云开发者 京东云开发者
8个月前
Elasticearch索引mapping写入、查看、修改
作者:京东物流陈晓娟一、ESElasticsearch是一个流行的开源搜索引擎,它可以将大量数据快速存储和检索。Elasticsearch还提供了强大的实时分析和聚合查询功能,数据模式更加灵活。它不需要预先定义固定的数据结构,可以随时添加或修改数据字段,而