基于Kafka和Elasticsearch构建实时站内搜索功能的实践

京东云开发者
• 阅读 79

作者:京东物流 纪卓志

目前我们在构建一个多租户多产品类网站,为了让用户更好的找到他们所需要的产品,我们需要构建站内搜索功能,并且它应该是实时更新的。本文将会讨论构建这一功能的核心基础设施,以及支持此搜索能力的技术栈。

问题的定义与决策

为了构建一个快速、实时的搜索引擎,我们必须做出某些设计决策。我们使用 MySQL 作为主数据库存储,因此有以下选择:

  1. 直接在 MySQL 数据库中查询用户在搜索框中输入的每个关键词,就像%#{word1}%#{word2}%...这样。 😐
  2. 使用一个高效的搜索数据库,如 Elasticsearch。😮

考虑到我们是一个多租户应用程序,同时被搜索的实体可能需要大量的关联操作(如果我们使用的是 MySQL 一类的关系型数据库),因为不同类型的产品有不同的数据结构,所以我们还可以能需要同时遍历多个数据表来查询用户输入的关键词。所以我们决定不使用直接在 MySQL 中查询关键词的方案。🤯

因此,我们必须决定一种高效、可靠的方式,将数据实时地从 MySQL 迁移到 Elasticsearch 中。接下来需要做出如下的决定:

  1. 使用 Worker 定期查询 MySQL 数据库,并将所有变化的数据发送到 Elasticsearch。😶
  2. 在应用程序中使用 Elasticsearch 客户端,将数据同时写入到 MySQL 和 Elasticsearch 中。🤔
  3. 使用基于事件的流引擎,将 MySQL 数据库中的数据更改作为事件,发送到流处理服务器上,经过处理后将其转发到 Elasticsearch。🥳

选项 1 并不是实时的,所以可以直接排除,而且即使我们缩短轮询间隔,也会造成全表扫描给数据库造成查询压力。除了不是实时的之外,选项 1 无法支持对数据的删除操作,如果对数据进行了删除,那么我们需要额外的表记录之前存在过的数据,这样才能保证用户不会搜索到已经删除了的脏数据。对于其他两种选择,不同的应用场景做出的决定可能会有所不同。在我们的场景中,如果选择选项 2,那么我们可以预见一些问题:如过 Elasticsearch 建立网络连接并确认更新时速度很慢,那么这可能会降低我们应用程序的速度;或者在写入 Elasticsearch 时发生了未知异常,我们该如何对这一操作进行重试来保证数据完整性;不可否认开发团队中不是所有开发人员都能了解所有的功能,如果有开发人员在开发新的与产品有关的业务逻辑时没有引入 Elasticsearch 客户端,那么我们将在 Elasticsearch 中更新这次数据的更改,无法保证 MySQL 与 Elasticsearch 间的数据一致性。

接下来我们该考虑如何将 MySQL 数据库中的数据更改作为事件,发送到流处理服务器上。我们可以在数据库变更后,在应用程序中使用消息管道的客户端同步地将事件发送到消息管道,但是这并没有解决上面提到的使用 Elasticsearch 客户端带来的问题,只不过是将风险从 Elasticsearch 转移到了消息管道。最终我们决定通过采集 MySQL Binlog,将 MySQL Binlog 作为事件发送到消息管道中的方式来实现基于事件的流引擎。关于 binlog 的内容可以点击链接,在这里不再赘述。

服务简介

基于Kafka和Elasticsearch构建实时站内搜索功能的实践

为了对外提供统一的搜索接口,我们首先需要定义用于搜索的数据结构。对于大部分的搜索系统而言,对用户展示的搜索结果通常包括为标题内容,这部分内容我们称之可搜索内容(Searchable Content)。在多租户系统中我们还需要在搜索结果中标示出该搜索结果属于哪个租户,或用来过滤当前租户下可搜索的内容,我们还需要额外的信息来帮助用户筛选自己想要搜索的产品类别,我们将这部分通用的但不用来进行搜索的内容称为元数据(Metadata)。最后,在我们展示搜索结果时可能希望根据不同类型的产品提供不同的展示效果,我们需要在搜索结果中返回这些个性化展示所需要的原始内容(Raw Content)。到此为止我们可以定义出了存储到 Elasticsearch 中的通用数据结构:

{
    "searchable": {
        "title": "string",
        "content": "string"
    },
    "metadata": {
        "tenant_id": "long",
        "type": "long",
        "created_at": "date",
        "created_by": "string",
        "updated_at": "date",
        "updated_by": "string"
    },
    "raw": {}
}

基础设施

Apache Kafka:Apache Kafka 是开源的分布式事件流平台。我们使用 Apache kafka 作为数据库事件(插入、修改和删除)的持久化存储。

mysql-binlog-connector-java:我们使用mysql-binlog-connector-java从 MySQL Binlog 中获取数据库事件,并将它发送到 Apache Kafka 中。我们将单独启动一个服务来完成这个过程。

在接收端我们也将单独启动一个服务来消费 Kafka 中的事件,并对数据进行处理然后发送到 Elasticsearch 中。

Q:为什么不使用Elasticsearch connector之类的连接器对数据进行处理并发送到Elasticsearch中?
A:在我们的系统中是不允许将大文本存入到MySQL中的,所以我们使用了额外的对象存储服务来存放我们的产品文档,所以我们无法直接使用连接器将数据发送到Elasticsearch中。
Q:为什么不在发送到Kafka前就将数据进行处理?
A:这样会有大量的数据被持久化到Kafka中,占用Kafka的磁盘空间,而这部分数据实际上也被存储到了Elasticsearch。
Q:为什么要用单独的服务来采集binlog,而不是使用Filebeat之类的agent?
A:当然可以直接在MySQL数据库中安装agent来直接采集binlog并发送到Kafka中。但是在部分情况下开发者使用的是云服务商或其他基础设施部门提供的MySQL服务器,这种情况下我们无法直接进入服务器安装agent,所以使用更加通用的、无侵入性的C/S结构来消费MySQL的binlog。

配置技术栈

我们使用 docker 和 docker-compose 来配置和部署服务。为了简单起见,MySQL 直接使用了 root 作为用户名和密码,Kafka 和 Elasticsearch 使用的是单节点集群,且没有设置任何鉴权方式,仅供开发环境使用,请勿直接用于生产环境。

version: "3"
services:
  mysql:
    image: mysql:5.7
    container_name: mysql
    environment:
      MYSQL_ROOT_PASSWORD: root
      MYSQL_DATABASE: app
    ports:
      - 3306:3306
    volumes:
      - mysql:/var/lib/mysql
  zookeeper:
    image: bitnami/zookeeper:3.6.2
    container_name: zookeeper
    ports:
      - 2181:2181
    volumes:
      - zookeeper:/bitnami
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: bitnami/kafka:2.7.0
    container_name: kafka
    ports:
      - 9092:9092
    volumes:
      - kafka:/bitnami
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.11.0
    container_name: elasticsearch
    environment:
      - discovery.type=single-node
    volumes:
      - elasticsearch:/usr/share/elasticsearch/data
    ports:
      - 9200:9200
volumes:
  mysql:
    driver: local
  zookeeper:
    driver: local
  kafka:
    driver: local
  elasticsearch:
    driver: local

在服务启动成功后我们需要为 Elasticsearch 创建索引,在这里我们直接使用 curl 调用 Elasticsearch 的 RESTful API,也可以使用 busybox 基础镜像创建服务来完成这个步骤。

# Elasticsearch
curl "http://localhost:9200/search" -XPUT -d '
{
  "mappings": {
    "properties": {
      "searchable": {
        "type": "nested",
        "properties": {
          "title": {
            "type": "text"
          },
          "content": {
            "type": "text"
          }
        }
      },
      "metadata": {
        "type": "nested",
        "properties": {
          "tenant_id": {
            "type": "long"
          },
          "type": {
            "type": "integer"
          },
          "created_at": {
            "type": "date"
          },
          "created_by": {
            "type": "keyword"
          },
          "updated_at": {
            "type": "date"
          },
          "updated_by": {
            "type": "keyword"
          }
        }
      },
      "raw": {
        "type": "nested"
      }
    }
  }
}'

核心代码实现(SpringBoot + Kotlin)

Binlog 采集端:

    override fun run() {
        client.serverId = properties.serverId
        val eventDeserializer = EventDeserializer()
        eventDeserializer.setCompatibilityMode(
            EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG
        )
        client.setEventDeserializer(eventDeserializer)
        client.registerEventListener {
            val header = it.getHeader<EventHeader>()
            val data = it.getData<EventData>()
            if (header.eventType == EventType.TABLE_MAP) {
                tableRepository.updateTable(Table.of(data as TableMapEventData))
            } else if (EventType.isRowMutation(header.eventType)) {
                val events = when {
                    EventType.isWrite(header.eventType) -> mapper.map(data as WriteRowsEventData)
                    EventType.isUpdate(header.eventType) -> mapper.map(data as UpdateRowsEventData)
                    EventType.isDelete(header.eventType) -> mapper.map(data as DeleteRowsEventData)
                    else -> emptyList()
                }
                logger.info("Mutation events: {}", events)
                for (event in events) {
                    kafkaTemplate.send("binlog", objectMapper.writeValueAsString(event))
                }
            }
        }
        client.connect()
    }

在这段代码里面,我们首先是对 binlog 客户端进行了初始化,随后开始监听 binlog 事件。binlog 事件类型有很多,大部分都是我们不需要关心的事件,我们只需要关注 TABLE_MAP 和 WRITE/UPDATE/DELETE 就可以。当我们接收到 TABLE_MAP 事件,我们会对内存中的数据库表结构进行更新,在后续的 WRITE/UPDATE/DELETE 事件中,我们会使用内存缓存的数据库结构进行映射。整个过程大概如下所示:

Table: ["id", "title", "content",...]
Row: [1, "Foo", "Bar",...]
=>
{
    "id": 1,
    "title": "Foo",
    "content": "Bar"
}

随后我们将收集到的事件发送到 Kafka 中,并由 Event Processor 进行消费处理。

事件处理器

@Component
class KafkaBinlogTopicListener(
    val binlogEventHandler: BinlogEventHandler
) {

    companion object {
        private val logger = LoggerFactory.getLogger(KafkaBinlogTopicListener::class.java)
    }

    private val objectMapper = jacksonObjectMapper()

    @KafkaListener(topics = ["binlog"])
    fun process(message: String) {
        val binlogEvent = objectMapper.readValue<BinlogEvent>(message)
        logger.info("Consume binlog event: {}", binlogEvent)
        binlogEventHandler.handle(binlogEvent)
    }
}

首先使用SpringBoot Message Kafka提供的注解对事件进行消费,接下来将事件委托到binlogEventHandler去进行处理。实际上BinlogEventHandler是个自定义的函数式接口,我们自定义事件处理器实现该接口后通过 Spring Bean 的方式注入到KafkaBinlogTopicListener中。

@Component
class ElasticsearchIndexerBinlogEventHandler(
    val restHighLevelClient: RestHighLevelClient
) : BinlogEventHandler {
    override fun handle(binlogEvent: BinlogEvent) {
        val payload = binlogEvent.payload as Map<*, *>
        val documentId = "${binlogEvent.database}_${binlogEvent.table}_${payload["id"]}"
        // Should delete from Elasticsearch
        if (binlogEvent.eventType == EVENT_TYPE_DELETE) {
            val deleteRequest = DeleteRequest()
            deleteRequest
                .index("search")
                .id(documentId)
            restHighLevelClient.delete(deleteRequest, DEFAULT)
        } else {
            // Not ever WRITE or UPDATE, just reindex
            val indexRequest = IndexRequest()
            indexRequest
                .index("search")
                .id(documentId)
                .source(
                    mapOf<String, Any>(
                        "searchable" to mapOf(
                            "title" to payload["title"],
                            "content" to payload["content"]
                        ),
                        "metadata" to mapOf(
                            "tenantId" to payload["tenantId"],
                            "type" to payload["type"],
                            "createdAt" to payload["createdAt"],
                            "createdBy" to payload["createdBy"],
                            "updatedAt" to payload["updatedAt"],
                            "updatedBy" to payload["updatedBy"]
                        )
                    )
                )
            restHighLevelClient.index(indexRequest, DEFAULT)
        }
    }
}

在这里我们只需要简单地判断是否为删除操作就可以,如果是删除操作需要在 Elasticsearch 中将数据删除,而如果是非删除操作只需要在 Elasticsearch 重新按照为文档建立索引即可。这段代码简单地使用了 Kotlin 中提供的 mapOf 方法对数据进行映射,如果需要其他复杂的处理只需要按照 Java 代码的方式编写处理器即可。

总结

其实 Binlog 的处理部分有很多开源的处理引擎,包括 Alibaba Canal,本文使用手动处理的方式也是为其他使用非 MySQL 数据源的同学类似的解决方案。大家可以按需所取,因地制宜,为自己的网站设计属于自己的实时站内搜索引擎!

点赞
收藏
评论区
推荐文章
冴羽 冴羽
1年前
VuePress 博客优化之开启 Algolia 全文搜索
前言在中,我们使用VuePress搭建了一个博客,最终的效果查看:。由于VuePress的内置搜索只会为页面的标题、h2、h3以及tags构建搜索索引。如果你需要全文搜索,可则以使用Algolia搜索,本篇讲讲如何申请以及配置Algolia搜索。AlgoliaAlgolia是一个数据库实时搜索服务,能够提供毫秒级的数据库搜
Stella981 Stella981
1年前
Demo:基于 Flink SQL 构建流式应用
Flink1.10.0于近期刚发布,释放了许多令人激动的新特性。尤其是FlinkSQL模块,发展速度非常快,因此本文特意从实践的角度出发,带领大家一起探索使用FlinkSQL如何快速构建流式应用。本文将基于Kafka,MySQL,Elasticsearch,Kibana,使用FlinkSQL构建一个电商用户行为的实时分析应用
Stella981 Stella981
1年前
KubeSphere DevOps 初体验,内置 Jenkins 引擎
KubeSphere是在Kubernetes之上构建的以应用为中心的多租户容器平台,提供全栈的IT自动化运维的能力,简化企业的DevOps工作流。KubeSphere提供了运维友好的向导式操作界面,帮助企业快速构建一个强大和功能丰富的容器云平台。KubeSphere支持部署在任何基础设施环境,提供在线与离线安装,支持一键升级与扩容集群,并且
Stella981 Stella981
1年前
Kafka概述及安装部署
一、Kafka概述1.Kafka是一个分布式流媒体平台,它有三个关键功能:(1)发布和订阅记录流,类似于消息队列或企业消息传递系统;(2)以容错的持久方式存储记录流;(3)记录发送时处理流。2.Kafka通常应用的两大类应用(1)构建在系统或应用程序之间的可靠获取数据的实时流数据管道;(2)构建转换或响应数据流的实施
Stella981 Stella981
1年前
071. ElasticSearch 应用场景及核心概念
1\.ES使用场景给网站/APP添加搜索功能。存储、分析数据。管理、交互、分析空间信息,将ES用于GIS。2\.ES简介Elasticsearch是一个基于Lucene构建的开源、分布式、RESTful接口全文检索引擎。Elast
可莉 可莉
1年前
071. ElasticSearch 应用场景及核心概念
1\.ES使用场景给网站/APP添加搜索功能。存储、分析数据。管理、交互、分析空间信息,将ES用于GIS。2\.ES简介Elasticsearch是一个基于Lucene构建的开源、分布式、RESTful接口全文检索引擎。Elast
Wesley13 Wesley13
1年前
5分钟Serverless实践:构建无服务器的图片分类系统
前 言在过去“5分钟Serverless实践”系列文章中,我们介绍了如何构建无服务器API和Web应用,从本质上来说,它们都属于基于APIG触发器对外提供一个无服务器API的场景。现在本文将介绍一种新的设计模式:基于事件的实时数据处理。为了更形象地描述,我们以图片分类为例,先介绍通过APIG触发器如何构建一个图片分类的Web应
Stella981 Stella981
1年前
Kafka 简介
Kafka简介_Kafka是分布式流平台。_一个流平台有3个主要特征:发布和订阅消息流,这一点与传统的消息队列相似。以容灾持久化方式的消息流存储。在消息流发生时处理消息流。Kafka通常使用在两大类应用中:在系统或应用之间,构建实时、可靠的消息流管道。构建实时流应用
小万哥 小万哥
4个月前
CMake vs Makefile: 如何选择适合你的项目构建工具
在软件开发中,构建(build)是一个非常重要的过程。我们需要将源代码转换为可执行文件或库文件。为了完成此过程,我们通常使用构建工具来自动化构建过程。CMake和Makefile都是用于构建和管理软件项目的工具。CMake是一个跨平台的构建工具,它可以自动
子桓 子桓
3个月前
好用的Java开发推荐!
好用的Java开发IntelliJIDEA2023中文,IntelliJIDEA提供了丰富的工具和功能,可以帮助开发人员提高开发效率和代码质量。它具有智能代码编辑器、代码检查、快速修复、多模块构建、重构、版本控制等功能。此外,它还支持自动化构建、测试和部署