springboot1.5.10兼容高版本6.1.1elasticsearch

Easter79
• 阅读 151

1.引入依赖

<dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>${elasticsearch.version}</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>transport</artifactId>
            <version>${elasticsearch.version}</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.plugin</groupId>
            <artifactId>transport-netty4-client</artifactId>
            <version>${elasticsearch.version}</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>${elasticsearch.version}</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.0</version>
        </dependency>

2.配置信息:

/**
 * 读取client配置信息
 * @author 
 *
 */
@Configuration
@Getter
@Setter
public class ClientConfig {
    
    /** 
     * elk集群地址 
     */  
    @Value("${elasticsearch.ip}")
    private String esHostName;  
    /** 
     * 端口 
     */  
    @Value("${elasticsearch.port}")
    private Integer esPort;  
    /** 
     * 集群名称 
     */  
    @Value("${elasticsearch.cluster.name}")
    private String esClusterName;  
  
    /** 
     * 连接池 
     */  
    @Value("${elasticsearch.pool}")
    private Integer esPoolSize;  
  
    
    /** 
     * 是否服务启动时重新创建索引
     */  
    @Value("${elasticsearch.regenerateIndexEnabled}")
    private Boolean esRegenerateIndexFlag; 
    
    
    /** 
     * 是否服务启动时索引数据同步
     */  
    @Value("${elasticsearch.syncDataEnabled}")
    private Boolean esSyncDataEnabled; 
}

3.es配置启动类:

import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.net.InetAddress;

/**
 * es配置启动类
 * @author
 *
 */
@Configuration
public class ElasticsearchConfig {
    private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchConfig.class);
    
    @Autowired
    ClientConfig clientConfig;
    
    @Bean
    public TransportClient init() {
        LOGGER.info("初始化开始。。。。。");  
        TransportClient transportClient = null;
  
        try {  
            /**
             *  配置信息 
             *  client.transport.sniff   增加嗅探机制,找到ES集群 
             *  thread_pool.search.size  增加线程池个数,暂时设为5  
             */
            Settings esSetting = Settings.builder()
                    .put("client.transport.sniff", true) 
                    .put("thread_pool.search.size", clientConfig.getEsPoolSize())
                    .build();  
            //配置信息Settings自定义
            transportClient = new PreBuiltTransportClient(esSetting);
            TransportAddress transportAddress = new TransportAddress(InetAddress.getByName(clientConfig.getEsHostName()), clientConfig.getEsPort());
            transportClient.addTransportAddresses(transportAddress);  
  
  
        } catch (Exception e) {  
            LOGGER.error("elasticsearch TransportClient create error!!!", e);  
        }  
  
        return transportClient;  
    }  
    
    
}

4.操作工具类:

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetRequestBuilder;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.io.ClassPathResource;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.io.InputStream;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;


public class ElasticsearchUtils {

    private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchUtils.class);

    @Autowired
    private TransportClient transportClient;

    private static TransportClient client;

    @PostConstruct
    public void init() {
        client = this.transportClient;
    }

    /**
     * 创建索引以及设置其内容
     * @param index
     * @param indexType
     * @param filePath:json文件路径
     */
    public static void createIndex(String index,String indexType,String filePath) throws RuntimeException {
        try {
                StringBuffer strBuf = new StringBuffer();
                //解析json配置
                ClassPathResource resource = new ClassPathResource(filePath);
                InputStream inputStream = resource.getInputStream();

                int len = 0;
                byte[] buf = new byte[1024];
                while((len=inputStream.read(buf)) != -1) {
                    strBuf.append(new String(buf, 0, len, "utf-8"));
                }
                inputStream.close();
                //创建索引
                createIndex(index);
                //设置索引元素
                putMapping(index, indexType, strBuf.toString());

        }catch(Exception e){
            throw new RuntimeException(e.getMessage());
        }
    }


        /**
         * 创建索引
         *
         * @param index 索引名称
         * @return
         */
        public static boolean createIndex(String index){

            try {
                if (isIndexExist(index)) {
                    //索引库存在则删除索引
                    deleteIndex(index);
                }
                CreateIndexResponse indexresponse = client.admin().indices().prepareCreate(index).setSettings(Settings.builder().put("index.number_of_shards", 5)
                        .put("index.number_of_replicas", 1)
                )
                        .get();
                LOGGER.info("创建索引 {} 执行状态 {}", index , indexresponse.isAcknowledged());

                return indexresponse.isAcknowledged();
            }catch (Exception e) {
                throw new RuntimeException(e.getMessage());
            }

        }


    /**
     * 创建索引
     *
     * @param index 索引名称
     * @param indexType 索引类型
     * @param mapping 创建的mapping结构
     * @return
     */
    public static boolean putMapping(String index,String indexType,String mapping) throws RuntimeException {
        if (!isIndexExist(index)) {
            throw new RuntimeException("创建索引库"+index+"mapping"+mapping+"结构失败,索引库不存在!");
        }
        try {
            PutMappingResponse indexresponse = client.admin().indices().preparePutMapping(index).setType(indexType).setSource(mapping, XContentType.JSON).get();

            LOGGER.info("索引 {} 设置 mapping {} 执行状态 {}", index ,indexType, indexresponse.isAcknowledged());

            return indexresponse.isAcknowledged();
        }catch (Exception e) {
            throw new RuntimeException(e.getMessage());
        }


    }

    /**
     * 判断索引是否存在
     *
     * @param index
     * @return
     */
    public static boolean isIndexExist(String index) {
        IndicesExistsResponse inExistsResponse = client.admin().indices().exists(new IndicesExistsRequest(index))
                .actionGet();
        return inExistsResponse.isExists();
    }


    /**
     * 删除索引
     *
     * @param index
     * @return
     */
    public static boolean deleteIndex(String index) throws RuntimeException{
        if (!isIndexExist(index)) {
            return true;
        }
        try {
            DeleteIndexResponse dResponse = client.admin().indices().prepareDelete(index).execute().actionGet();
            if (dResponse.isAcknowledged()) {
                LOGGER.info("delete index " + index + "  successfully!");
            } else {
                LOGGER.info("Fail to delete index " + index);
            }
            return dResponse.isAcknowledged();
        } catch (Exception e) {

            throw new RuntimeException(e.getMessage());
        }
    }


    /**
     * 数据添加
     *
     * @param jsonObject
     *            要增加的数据
     * @param index
     *            索引,类似数据库
     * @param type
     *            类型,类似表
     * @return
     */
    public static String addData(JSONObject jsonObject, String index, String type) {
        return addData(jsonObject, index, type, UUID.randomUUID().toString().replaceAll("-", "").toUpperCase());
    }

    /**
     * 数据添加,正定ID
     *
     * @param jsonObject
     *            要增加的数据
     * @param index
     *            索引,类似数据库
     * @param type
     *            类型,类似表
     * @param id
     *            数据ID
     * @return
     */
    public static String addData(JSONObject jsonObject, String index, String type, String id)throws RuntimeException {
        try {
            IndexResponse response = client.prepareIndex(index, type, id).setSource(jsonObject).get();

            LOGGER.info("addData response status:{},id:{}", response.status().getStatus(), response.getId());

            return response.getId();
        } catch (Exception e) {
            throw new RuntimeException(e.getMessage());
        }
    }


    /**
     * 批量数据添加,
     *
     * @param list
     *            要增加的数据
     * @param pkName
     *            主键id
     * @param index
     *            索引,类似数据库
     * @param type
     *            类型,类似表
     * @return
     */
    public static <T> void addBatchData(List<T> list, String pkName, String index, String type) {
        if(list == null || list.isEmpty()) {
            return;
        }
        // 创建BulkPorcessor对象
        BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long paramLong, BulkRequest paramBulkRequest) {
                // TODO Auto-generated method stub
            }

            // 执行出错时执行
            @Override
            public void afterBulk(long paramLong, BulkRequest paramBulkRequest, Throwable paramThrowable) {
                // TODO Auto-generated method stub
            }
            @Override
            public void afterBulk(long paramLong, BulkRequest paramBulkRequest, BulkResponse paramBulkResponse) {
                // TODO Auto-generated method stub
            }
        })
                // 1w次请求执行一次bulk
                .setBulkActions(1000)
                // 1gb的数据刷新一次bulk
                // .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
                // 固定5s必须刷新一次
                .setFlushInterval(TimeValue.timeValueSeconds(5))
                // 并发请求数量, 0不并发, 1并发允许执行
                .setConcurrentRequests(1)
                // 设置退避, 100ms后执行, 最大请求3次
                .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)).build();

        for (T vo : list) {
            if(getPkValueByName(vo, pkName)!= null) {
                String id = getPkValueByName(vo, pkName).toString();
                bulkProcessor.add(new IndexRequest(index, type, id).source(JSON.toJSONString(vo), XContentType.JSON));
            }

        }
        bulkProcessor.close();
    }

    /**
     * 根据主键名称获取实体类主键属性值
     *
     * @param clazz
     * @param pkName
     * @return
     */
    private static Object getPkValueByName(Object clazz, String pkName) {
        try {
            String firstLetter = pkName.substring(0, 1).toUpperCase();
            String getter = "get" + firstLetter + pkName.substring(1);
            Method method = clazz.getClass().getMethod(getter, new Class[] {});
            Object value = method.invoke(clazz, new Object[] {});
            return value;
        } catch (Exception e) {
            return null;
        }
    }


    /**
     * 通过ID 更新数据
     *
     * @param jsonObject
     *            要增加的数据
     * @param index
     *            索引,类似数据库
     * @param type
     *            类型,类似表
     * @param id
     *            数据ID
     * @return
     */
    public static void updateDataById(JSONObject jsonObject, String index, String type, String id) throws RuntimeException {

        try{
            UpdateRequest updateRequest = new UpdateRequest();

            updateRequest.index(index).type(type).id(id).doc(jsonObject);

            client.update(updateRequest);
        } catch (Exception e) {
            throw new RuntimeException(e.getMessage());
        }
    }

    /**
     * 批量数据更新,
     *
     * @param list
     *            要增加的数据
     * @param pkName
     *            主键id
     * @param index
     *            索引,类似数据库
     * @param type
     *            类型,类似表
     * @return
     */
    public static <T> void updateBatchData(List<T> list, String pkName, String index, String type) {
        // 创建BulkPorcessor对象
        BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long paramLong, BulkRequest paramBulkRequest) {
                // TODO Auto-generated method stub
            }

            // 执行出错时执行
            @Override
            public void afterBulk(long paramLong, BulkRequest paramBulkRequest, Throwable paramThrowable) {
                // TODO Auto-generated method stub
            }
            @Override
            public void afterBulk(long paramLong, BulkRequest paramBulkRequest, BulkResponse paramBulkResponse) {
                // TODO Auto-generated method stub
            }
        })
                // 1w次请求执行一次bulk
                .setBulkActions(1000)
                // 1gb的数据刷新一次bulk
                // .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
                // 固定5s必须刷新一次
                .setFlushInterval(TimeValue.timeValueSeconds(5))
                // 并发请求数量, 0不并发, 1并发允许执行
                .setConcurrentRequests(1)
                // 设置退避, 100ms后执行, 最大请求3次
                .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)).build();

        for (T vo : list) {
            String id = getPkValueByName(vo, pkName).toString();
            bulkProcessor.add(new UpdateRequest(index, type, id).doc(JSON.toJSONString(vo), XContentType.JSON));
        }
        bulkProcessor.close();
    }


    /**
     * 通过ID获取数据
     *
     * @param index
     *            索引,类似数据库
     * @param type
     *            类型,类似表
     * @param id
     *            数据ID
     * @param fields
     *            需要显示的字段,逗号分隔(缺省为全部字段)
     * @return
     */
    public static Map<String, Object> searchDataById(String index, String type, String id, String fields) {

        GetRequestBuilder getRequestBuilder = client.prepareGet(index, type, id);

        if (StringUtils.isNotEmpty(fields)) {
            getRequestBuilder.setFetchSource(fields.split(","), null);
        }

        GetResponse getResponse = getRequestBuilder.execute().actionGet();

        return getResponse.getSource();
    }

    /**
     * 使用分词查询
     *
     * @param index
     *            索引名称
     * @param type
     *            类型名称,可传入多个type逗号分隔
     * @param clz
     *            数据对应实体类
     * @param fields
     *            需要显示的字段,逗号分隔(缺省为全部字段)
     * @param boolQuery
     *            查询条件
     * @return
     */
    public static <T> List<T> searchListData(String index, String type, Class<T> clz, String fields,BoolQueryBuilder boolQuery) {
        return searchListData(index, type, clz, 0, fields, null,  null,boolQuery);
    }

    /**
     * 使用分词查询
     *
     * @param index
     *            索引名称
     * @param type
     *            类型名称,可传入多个type逗号分隔
     * @param clz
     *            数据对应实体类
     * @param size
     *            文档大小限制
     * @param fields
     *            需要显示的字段,逗号分隔(缺省为全部字段)
     * @param sortField
     *            排序字段
     * @param highlightField
     *            高亮字段
     * @param boolQuery
     *            查询条件
     * @return
     */
    public static <T> List<T> searchListData(String index, String type, Class<T> clz,
                                             Integer size, String fields, String sortField, String highlightField,BoolQueryBuilder boolQuery) throws RuntimeException{

        SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index);
        if (StringUtils.isNotEmpty(type)) {
            searchRequestBuilder.setTypes(type.split(","));
        }
        // 高亮(xxx=111,aaa=222)
        if (StringUtils.isNotEmpty(highlightField)) {
            HighlightBuilder highlightBuilder = new HighlightBuilder();
            // 设置高亮字段
            highlightBuilder.field(highlightField);
            searchRequestBuilder.highlighter(highlightBuilder);
        }
        searchRequestBuilder.setQuery(boolQuery);
        if (StringUtils.isNotEmpty(fields)) {
            searchRequestBuilder.setFetchSource(fields.split(","), null);
        }
        searchRequestBuilder.setFetchSource(true);

        if (StringUtils.isNotEmpty(sortField)) {
            searchRequestBuilder.addSort(sortField, SortOrder.DESC);
        }
        if (size != null && size > 0) {
            searchRequestBuilder.setSize(size);
        }
        searchRequestBuilder.setScroll(new TimeValue(1000));
        searchRequestBuilder.setSize(10000);
        // 打印的内容 可以在 Elasticsearch head 和 Kibana 上执行查询
        LOGGER.info("\n{}", searchRequestBuilder);

        SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();

        long totalHits = searchResponse.getHits().totalHits;
        if(LOGGER.isDebugEnabled()) {
            long length = searchResponse.getHits().getHits().length;

            LOGGER.info("共查询到[{}]条数据,处理数据条数[{}]", totalHits, length);
        }


        if (searchResponse.status().getStatus() ==200) {
            // 解析对象
            return setSearchResponse(clz, searchResponse, highlightField);
        }

        return null;
    }


    /**
     * 高亮结果集 特殊处理
     *
     * @param clz
     *            数据对应实体类
     * @param searchResponse
     *
     * @param highlightField
     *            高亮字段
     */
    private static <T> List<T> setSearchResponse(Class<T> clz, SearchResponse searchResponse, String highlightField) {
        List<T> sourceList = new ArrayList<T>();
        for (SearchHit searchHit : searchResponse.getHits().getHits()) {
            searchHit.getSourceAsMap().put("id", searchHit.getId());
            StringBuffer stringBuffer = new StringBuffer();
            if (StringUtils.isNotEmpty(highlightField)) {

                // System.out.println("遍历 高亮结果集,覆盖 正常结果集" + searchHit.getSourceAsMap());
                HighlightField highlight = searchHit.getHighlightFields().get(highlightField);
                if(highlight == null) {
                    continue;
                }
                Text[] text = highlight.getFragments();
                if (text != null) {
                    for (Text str : text) {
                        stringBuffer.append(str.string());
                    }
                    // 遍历 高亮结果集,覆盖 正常结果集
                    searchHit.getSourceAsMap().put(highlightField, stringBuffer.toString());
                }
            }

            T t = JSON.parseObject(JSON.toJSONString(searchHit.getSourceAsMap()), clz);
            sourceList.add(t);
        }

        return sourceList;
    }

}
点赞
收藏
评论区
推荐文章
秃头王路飞 秃头王路飞
4个月前
webpack5手撸vue2脚手架
webpack5手撸vue相信工作个12年的小伙伴们在面试的时候多多少少怕被问到关于webpack方面的知识,本菜鸟最近闲来无事,就尝试了手撸了下vue2的脚手架,第一次发帖实在是没有经验,望海涵。languageJavaScript"name":"vuecliversion2","version":"1.0.0","desc
浅梦一笑 浅梦一笑
4个月前
初学 Python 需要安装哪些软件?超级实用,小白必看!
编程这个东西是真的奇妙。对于懂得的人来说,会觉得这个工具是多么的好用、有趣,而对于小白来说,就如同大山一样。其实这个都可以理解,大家都是这样过来的。那么接下来就说一下python相关的东西吧,并说一下我对编程的理解。本人也是小白一名,如有不对的地方,还请各位大神指出01名词解释:如果在编程方面接触的比较少,那么对于软件这一块,有几个名词一定要了解,比如开发环
技术小男生 技术小男生
4个月前
linux环境jdk环境变量配置
1:编辑系统配置文件vi/etc/profile2:按字母键i进入编辑模式,在最底部添加内容:JAVAHOME/opt/jdk1.8.0152CLASSPATH.:$JAVAHOME/lib/dt.jar:$JAVAHOME/lib/tools.jarPATH$JAVAHOME/bin:$PATH3:生效配置
光头强的博客 光头强的博客
4个月前
Java面向对象试题
1、请创建一个Animal动物类,要求有方法eat()方法,方法输出一条语句“吃东西”。创建一个接口A,接口里有一个抽象方法fly()。创建一个Bird类继承Animal类并实现接口A里的方法输出一条有语句“鸟儿飞翔”,重写eat()方法输出一条语句“鸟儿吃虫”。在Test类中向上转型创建b对象,调用eat方法。然后向下转型调用eat()方
刚刚好 刚刚好
4个月前
css问题
1、在IOS中图片不显示(给图片加了圆角或者img没有父级)<div<imgsrc""/</divdiv{width:20px;height:20px;borderradius:20px;overflow:h
blmius blmius
1年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
小森森 小森森
4个月前
校园表白墙微信小程序V1.0 SayLove -基于微信云开发-一键快速搭建,开箱即用
后续会继续更新,敬请期待2.0全新版本欢迎添加左边的微信一起探讨!项目地址:(https://www.aliyun.com/activity/daily/bestoffer?userCodesskuuw5n)\2.Bug修复更新日历2.情侣脸功能大家不要使用了,现在阿里云的接口已经要收费了(土豪请随意),\\和注意
晴空闲云 晴空闲云
4个月前
css中box-sizing解放盒子实际宽高计算
我们知道传统的盒子模型,如果增加内边距padding和边框border,那么会撑大整个盒子,造成盒子的宽度不好计算,在实务中特别不方便。boxsizing可以设置盒模型的方式,可以很好的设置固定宽高的盒模型。盒子宽高计算假如我们设置如下盒子:宽度和高度均为200px,那么这会这个盒子实际的宽高就都是200px。但是当我们设置这个盒子的边框和内间距的时候,那
艾木酱 艾木酱
3个月前
快速入门|使用MemFire Cloud构建React Native应用程序
MemFireCloud是一款提供云数据库,用户可以创建云数据库,并对数据库进行管理,还可以对数据库进行备份操作。它还提供后端即服务,用户可以在1分钟内新建一个应用,使用自动生成的API和SDK,访问云数据库、对象存储、用户认证与授权等功能,可专
Stella981 Stella981
1年前
Android So动态加载 优雅实现与原理分析
背景:漫品Android客户端集成适配转换功能(基于目标识别(So库35M)和人脸识别库(5M)),导致apk体积50M左右,为优化客户端体验,决定实现So文件动态加载.!(https://oscimg.oschina.net/oscnet/00d1ff90e4b34869664fef59e3ec3fdd20b.png)点击上方“蓝字”关注我
helloworld_28799839 helloworld_28799839
4个月前
常用知识整理
Javascript判断对象是否为空jsObject.keys(myObject).length0经常使用的三元运算我们经常遇到处理表格列状态字段如status的时候可以用到vue