ElasticSearch(十):springboot集成ElasticSearch集群完成数据的增,删,改

Stella981
• 阅读 458

前言

之前介绍了使用devTools进行索引库数据的crud,这里使用的是java程序,使用中间件activeMQ进行数据库和索引库数据的同步。主要是用来完成对数据库的修改来完成对索引库的同步。

正文

前提准备:

1. 索引信息:

结构化的索引,在索引的setting中,使用的是ik分词器,级别是ik-max-word。

mapping映射信息中,使用的dynamic = false,如果不能匹配到结构化索引的字段,则不进行数据的添加和更新。这样的好处是:不添加多余的不需要索引的字段,并且如果添加的字段多的话可以过滤掉无用的而不报错。对数据库添加的很多字段,只添加需要索引的结构化的映射信息。

2. elasticsearch信息

使用的是一台机器上的三个elasticsearch服务端的集群,分别使用的是9301,9300,9302的tcp端口。

创建项目进行数据库的操作

1. 对数据的删除

ElasticSearch(十):springboot集成ElasticSearch集群完成数据的增,删,改 ElasticSearch(十):springboot集成ElasticSearch集群完成数据的增,删,改

boolean flag = bookMapper.deleteByPrimaryKey(book.getId()) > 0 ? true : false;
        logger.info("删除数据返回的结果:" + flag);
        //将消息放入队列中,esQueueDelete:发送消息的类型,消息内容:book.getId().toString()
        if (flag) {
            producer.sendMessage(esQueueDelete, book.getId().toString());
        }
        return flag;

View Code

2. 对数据的修改

ElasticSearch(十):springboot集成ElasticSearch集群完成数据的增,删,改 ElasticSearch(十):springboot集成ElasticSearch集群完成数据的增,删,改

boolean flag = bookMapper.updateByPrimaryKeySelective(book) > 0 ? true : false;
        logger.info("更新数据返回的结果:" + flag);
        //将消息放入队列中,esQueueUpdate:消息的类型,消息体:将更新的数据转换成json放入消息体
        if (flag) {
            producer.sendMessage(esQueueUpdate,JSONObject.toJSONStringWithDateFormat(book, Constant.DATETIME_FORMATTION));
        }
        return flag;

View Code

3. 对数据的添加

ElasticSearch(十):springboot集成ElasticSearch集群完成数据的增,删,改 ElasticSearch(十):springboot集成ElasticSearch集群完成数据的增,删,改

boolean flag = bookMapper.insertSelective(book) > 0 ? true : false;
        logger.info("插入的数据返回的id是{}", book.getId());
        /**
         * 将数据插入到消息队列中。
         * 如果索引创建的时候dynamic字段是false,可以直接传整对象,
         * 如果不是,请自行过滤之后传入对象,否则会报错或者添加需要无用字段并被索引。
         */
        if (flag) {
            producer.sendMessage(esQueueCreate, JSONObject.toJSONStringWithDateFormat(book, Constant.DATETIME_FORMATTION));
        }
        return flag;

View Code

ES项目接受消息进行索引库操作

ElasticSearch(十):springboot集成ElasticSearch集群完成数据的增,删,改 ElasticSearch(十):springboot集成ElasticSearch集群完成数据的增,删,改

/**
     * 根据activeMQ消息订阅,自动将数据插入到索引库
     * @param receiveMsg 监听接收到的消息
     */
    @JmsListener(destination = JmsConfig.QUEUE_CREATE,containerFactory = "jmsListenerContainerQueue")
    public void autoInsertData(String receiveMsg){
        if (StringUtils.isBlank(receiveMsg)){
            logger.error("自动插入数据到索引库====>从消息队列中接收到的消息为空。监听的为:{}", JmsConfig.QUEUE_CREATE);
        }
        logger.info("自动插入数据到索引库====>从消息队列中接收到的消息为:{}", receiveMsg);
        String id = dataService.insertData(receiveMsg, defaultIndex, defaultType);
        logger.info("时间:{},监听消息队列:{},插入到索引库成功,id为:{}",LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE), JmsConfig.QUEUE_CREATE, id);
    }

View Code

需要注意的是:

   @JmsListener(destination = JmsConfig.QUEUE_CREATE,containerFactory = "jmsListenerContainerQueue"),其中:JmsConfig.QUEUE_CREATE必须是常量,表示监听的消息类型。使用的是containerFactory 是jmsListenerContainerQueue

  直接拿到receiveMsg,就是接收的消息体。

具体插入索引库代码

ElasticSearch(十):springboot集成ElasticSearch集群完成数据的增,删,改 ElasticSearch(十):springboot集成ElasticSearch集群完成数据的增,删,改

@Override
    public String insertData(String content, String index, String type) {
        JSONObject jsonObject = JSONObject.parseObject(content);
        String id = jsonObject.get(primaryKey).toString();
        IndexResponse indexResponse = transportClient.prepareIndex(index, type, id).setSource(jsonObject.toJSONString(), XContentType.JSON).get();
        logger.info("插入数据库的数据是:{},插入索引库返回状态:{},插入的id为{}",content, indexResponse.status(), indexResponse.getId());
        return indexResponse.getId();
    }

View Code

删除和更新类似,直接上代码了。

ElasticSearch(十):springboot集成ElasticSearch集群完成数据的增,删,改 ElasticSearch(十):springboot集成ElasticSearch集群完成数据的增,删,改

/**
     * 根据activeMQ消息订阅,自动将数据插入到索引库
     * @param receiveMsg 监听接收到的消息
     */
    @JmsListener(destination = JmsConfig.QUEUE_CREATE,containerFactory = "jmsListenerContainerQueue")
    public void autoInsertData(String receiveMsg){
        if (StringUtils.isBlank(receiveMsg)){
            logger.error("自动插入数据到索引库====>从消息队列中接收到的消息为空。监听的为:{}", JmsConfig.QUEUE_CREATE);
        }
        logger.info("自动插入数据到索引库====>从消息队列中接收到的消息为:{}", receiveMsg);
        String id = dataService.insertData(receiveMsg, defaultIndex, defaultType);
        logger.info("时间:{},监听消息队列:{},插入到索引库成功,id为:{}",LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE), JmsConfig.QUEUE_CREATE, id);
    }

    /**
     * 根据订阅的消息,自动从索引库删除数据
     * @param receiveMsg
     */
    @JmsListener(destination = JmsConfig.QUEUE_DELETE,containerFactory = "jmsListenerContainerQueue")
    public void autoDeleteData(String receiveMsg){
        if (StringUtils.isBlank(receiveMsg)){
            logger.error("自动从索引库删除数据===>从消息队列中接收到的消息为空。监听的为:{}", JmsConfig.QUEUE_DELETE);
        }
        boolean flag = dataService.deleteData(receiveMsg, defaultIndex, defaultType);
        logger.info("自动从索引库删除数据===>从消息队列中接收到的消息为:{},删除结果为:{}", receiveMsg, flag);
    }

    /**
     * 根据订阅的消息,自动从索引库更新数据
     * @param receiveMsg
     */
    @JmsListener(destination = JmsConfig.QUEUE_UPDATE,containerFactory = "jmsListenerContainerQueue")
    public void autoUpdateData(String receiveMsg){
        if (StringUtils.isBlank(receiveMsg)){
            logger.error("自动从索引库更新数据===>从消息队列中接收到的消息为空。监听的为:{}", JmsConfig.QUEUE_DELETE);
        }
        logger.info("自动更新数据到索引库====>从消息队列中接收到的消息为:{}", receiveMsg);
        boolean flag = dataService.updateData(receiveMsg, defaultIndex, defaultType);
        logger.info("自动从索引库更新数据===>从消息队列中接收到的消息为:{},更新结果为:{}", receiveMsg, flag);
    }

    @Override
    public String insertData(String content, String index, String type) {
        JSONObject jsonObject = JSONObject.parseObject(content);
        String id = jsonObject.get(primaryKey).toString();
        IndexResponse indexResponse = transportClient.prepareIndex(index, type, id).setSource(jsonObject.toJSONString(), XContentType.JSON).get();
        logger.info("插入数据库的数据是:{},插入索引库返回状态:{},插入的id为{}",content, indexResponse.status(), indexResponse.getId());
        return indexResponse.getId();
    }

    @Override
    public boolean deleteData(String receiveMsg, String index, String type) {
        DeleteResponse deleteResponse = transportClient.prepareDelete(index, type, receiveMsg).execute().actionGet();
        logger.info("删除索引的结果为:{},删除的索引数据的id为:{}", deleteResponse.status().getStatus(), deleteResponse.getId());
        return deleteResponse.status().getStatus() == 200 ? true : false;
    }

    @Override
    public boolean updateData(String receiveMsg, String defaultIndex, String defaultType) {
        JSONObject jsonObject = JSONObject.parseObject(receiveMsg);
        String id = jsonObject.get(primaryKey).toString();
        UpdateRequest updateRequest = new UpdateRequest().index(defaultIndex).type(defaultType).id(id).doc(jsonObject.toJSONString(), XContentType.JSON);
        UpdateResponse updateResponse = transportClient.update(updateRequest).actionGet();
        int status = updateResponse.status().getStatus();
        logger.info("更新索引结果:{},更新之后为:{}", status, receiveMsg);
        return status == 200 ? true : false;
    }

View Code

到目前为止,已经可以进行操作数据库同步进行索引库的操作了。但是还是存在很多问题的,主要在于activeMQ的一些问题。以后在解决。

点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
添砖java的啾 添砖java的啾
2年前
distinct效率更高还是group by效率更高?
目录00结论01distinct的使用02groupby的使用03distinct和groupby原理04推荐groupby的原因00结论先说大致的结论(完整结论在文末):在语义相同,有索引的情况下groupby和distinct都能使用索引,效率相同。在语义相同,无索引的情况下:distinct效率高于groupby。原因是di
Jacquelyn38 Jacquelyn38
2年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Stella981 Stella981
2年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Stella981 Stella981
2年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Stella981 Stella981
2年前
ELK学习笔记之ElasticSearch的索引详解
0x00ElasticSearch的索引和MySQL的索引方式对比Elasticsearch是通过Lucene的倒排索引技术实现比关系型数据库更快的过滤。特别是它对多条件的过滤支持非常好,比如年龄在18和30之间,性别为女性这样的组合查询。倒排索引很多地方都有介绍,但是其比关系型
为什么mysql不推荐使用雪花ID作为主键
作者:毛辰飞背景在mysql中设计表的时候,mysql官方推荐不要使用uuid或者不连续不重复的雪花id(long形且唯一),而是推荐连续自增的主键id,官方的推荐是auto_increment,那么为什么不建议采用uuid,使用uuid究
Python进阶者 Python进阶者
3个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这