ElasticSearch高版本API的使用姿势

原创 吴就业 122 0 2020-03-05

本文为博主原创文章,未经博主允许不得转载。

本文链接:https://wujiuye.com/article/cda4f6c731d94a33ae38d955cce4cde9

作者:吴就业
链接:https://wujiuye.com/article/cda4f6c731d94a33ae38d955cce4cde9
来源:吴就业的网络日记
本文为博主原创文章,未经博主允许不得转载。

如何在Java项目中使用elasticsearch-rest-high-level-client

直接http接口调用

img

实际上使用kibana或者elasticsearch-rest-high-level-client最终也是发送http请求。不同于redis这类服务,需要去了解它的通信协议,再通过Socket编程去实现通信,因此都是直接使用别人封装好的API。而ES提供了RESTFUL接口,就不需要我们去了解协议,因此,最简单的方式就是直接构造请求body发送http请求访问ES

String esUrl = String.format("%s/%s/_search",elasticsearchConfig.getClusterNodes(),INDEX);
// 发送http请求
String responseStr = HttpUtil.requestPostWithJson(esUrl,searchBuilder.toString());
// 对响应结果responseStr进行解析

这里还是会用到ESAPI,使用SearchBuilder构造请求体。当然也可以不使用,自己实现。

SearchSourceBuilder searchBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery()
                .must(....);
searchBuilder.query(boolQueryBuilder);

但是构造请求body也是很繁琐的事情,因此一般会选择使用封装的API

基于API封装便用方式

添加elasticsearch-rest-high-level-client依赖。添加依赖时需要排除elasticsearchelasticsearch-rest-client包的依赖,因为默认是依赖低版本的,这里有个坑。排除之后再自己添加对应版本的elasticsearchelasticsearch-rest-client包的依赖就行了。(项目中用的是maven,我还是喜欢用gradle)。

<!-- es -->
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>${elasticsearch.version}</version>
    <exclusions>
        <!-- 默认引入的低版本 所以要排除重新依赖 -->
        <exclusion>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
        </exclusion>
            <exclusion>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-client</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<!-- 重新依赖 -->
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-client</artifactId>
    <version>${elasticsearch.version}</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>${elasticsearch.version}</version>
</dependency>

接着在application.yml配置文件中添加es的配置,名称自己取。

# 要求ES 7.x版本
es:
  host: 127.0.0.1
  port: 9400
  scheme: http

读取配置信息:

@Component
@ConfigurationProperties(prefix = "es")
public class ElasticSearchPropertys {
    private String host;
    private int port;
    private String scheme;
}

然后根据配置创建一个RestHighLevelClient注入到spring容器。

@Configuration
public class ElasticSearchConfig {

    @Resource
    private ElasticSearchPropertys elasticSearchPropertys;

    @Bean
    public RestHighLevelClient restHighLevelClient() {
        RestClientBuilder restClientBuilder = RestClient.builder(
                new HttpHost(elasticSearchPropertys.getHost(),
                        elasticSearchPropertys.getPort(), elasticSearchPropertys.getScheme()));
        return new RestHighLevelClient(restClientBuilder);
    }

}

如果是集群,自己改下就好了,RestClientbuilder方法是支持传多个HttpHost的。

然后就可以愉快的使用RestHighLevelClient提供的API实现CURD操作了。为了便于使用,可以基于RestHighLevelClient再封装一层。

/**
 * 封装ES通用API
 *
 * @author wujiuye
 * @date 2020/03/04
 */
@Component
@ConditionalOnBean(RestHighLevelClient.class)
public class ElasticSearchService {

    @Resource
    RestHighLevelClient restHighLevelClient;

    /**
     * 判断某个index是否存在
     *
     * @param index index名
     */
    public boolean existIndex(String index) throws Exception {
    }

    /**
     * 创建索引(仅测试使用)
     *
     * @param index    索引名称
     * @param mappings 索引描述
     * @param shards   分片数
     * @param replicas 副本数
     */
    public void createIndex(String index, EsIndexMappings mappings, int shards, int replicas) throws Exception {
    }

    /**
     * 插入或更新单条记录
     *
     * @param index  index
     * @param entity 对象
     */
    public void insertOrUpdate(String index, EsEntity entity) throws Exception {
    }

    /**
     * 批量插入数据
     *
     * @param index index
     * @param list  带插入列表
     */
    public void insertBatch(String index, List<EsEntity> list) throws Exception {
    }

    /**
     * 搜索
     *
     * @param index   index
     * @param builder 查询参数
     * @param c       结果对象类型
     */
    public <T> List<T> search(String index, SearchSourceBuilder builder, Class<T> c) throws Exception {
    }

    .......

}

在开发过程中,我们需要本地测试,或者连接测试环境的ES进行测试。为了方便,我将在创建索引的动作写在代码中,当判断环境为dev环境时,删除索引重建。因此,我也封装了创建索引的逻辑。

首先是定义一个注解,用于注释在实体类的字段上,用于创建索引时构造mapping。如果需要更多信息,添加到EsField注解,并完善解析逻辑就可以了。

/**
 * ES索引字段映射,用于代码创建索引 (仅测试使用)
 *
 * @author wujiuye
 * @date 2020/03/04
 */
@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface EsField {

    /**
     * 字段类型
     *
     * @return
     */
    String type() default "text";

}

例如:

public class Product {

    /**
     * 商品id
     */
    @EsField(type = "integer")
    private Integer leId;
    /**
     * 品牌id
     */
    @EsField(type = "integer")
    private Integer brandId;
    /**
     * 品牌名称
     */
    private String brandName;
    /**
     * 日期
     */
    private String date;
}

实现注解EsField的解析,生成EsIndexMappings对象。

/**
 * 用于代码创建索引(仅测试使用)
 *
 * @author wujiuye
 * @date 2020/03/04
 */
public class EsIndexMappings {

    private boolean dynamic = false;
    private Map<String, Map<String, Object>> properties;

    /**
     * 生成索引字段映射信息
     *
     * @param dynamic
     * @param type
     * @return
     */
    public static EsIndexMappings byType(boolean dynamic, Class<?> type) {
        EsIndexMappings esIndexMappings = new EsIndexMappings();
        esIndexMappings.setDynamic(dynamic);
        esIndexMappings.setProperties(new HashMap<>());
        Field[] fields = type.getDeclaredFields();
        for (Field field : fields) {
            Map<String, Object> value = new HashMap<>();
            EsField esField = field.getAnnotation(EsField.class);
            if (esField == null) {
                value.put("type", "text");
                value.put("index", true);
            } else {
                value.put("type", esField.type());
                value.put("index", esField.index());
            }
            esIndexMappings.getProperties().put(field.getName(), value);
        }
        return esIndexMappings;
    }

}

创建索引方法的实现:

/**
     * 创建索引(仅测试使用)
     *
     * @param index    索引名称
     * @param mappings 索引描述
     * @param shards   分片数
     * @param replicas 副本数
     */
    public void createIndex(String index, EsIndexMappings mappings, int shards, int replicas) throws Exception {
        if (this.existIndex(index)) {
            return;
        }
        CreateIndexRequest request = new CreateIndexRequest(index);
        request.settings(Settings.builder()
                // 分片数
                .put("index.number_of_shards", shards)
                // 副本数
                .put("index.number_of_replicas", replicas));
        // 指定mappings
        request.mapping(JSON.toJSONString(mappings), XContentType.JSON);
        CreateIndexResponse res = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
        if (!res.isAcknowledged()) {
            throw new RuntimeException("所以创建失败!");
        }
    }

使用例子:

elasticService.createIndex(INDEX, EsIndexMappings.byType(false, Product.class), 1, 1);

在插入对象时,我们可能会有指定文档id的需求,因此,为了封装更通用的插入和批量插入方法,需要抽象一个中间对象EsEntity

public class EsEntity {
    /**
     * 索引的_id,不指定则使用es自动生成的
     */
    private String id;

    /**
     * 不转中间对象,直接转为json字符串,避免批量插入浪费内存资源
     */
    private String jsonData;
}

提供将任意对象转为EsEntity的静态方法,支持指定id和不指定id,当不指定id时,ES会自动生成。

/**
     * 将任意类型对象转为EsEntity
     * 不指定_id
     *
     * @param obj 一个文档(记录)
     * @param <T>
     * @return
     */
    public static <T> EsEntity objToElasticEntity(T obj) {
        return objToElasticEntity(null, obj);
    }

    /**
     * 将任意类型对象转为EsEntity
     *
     * @param id  null:不指定_id,非null:指定_id
     * @param obj 一个文档(记录)
     * @param <T>
     * @return
     */
    public static <T> EsEntity objToElasticEntity(Integer id, T obj) {
        EsEntity elasticEntity = new EsEntity();
        String data = JSON.toJSONString(obj);
        elasticEntity.setId(id == null ? null : String.valueOf(id));
        elasticEntity.setData(data);
        return elasticEntity;
    }

插入和批量插入的实现:

 /**
     * 插入或更新单条记录
     *
     * @param index  index
     * @param entity 对象
     */
    public void insertOrUpdate(String index, EsEntity entity) throws Exception {
        IndexRequest request = new IndexRequest(index);
        request.id(entity.getId());
        request.source(entity.getData(), XContentType.JSON);
        IndexResponse response = restHighLevelClient.index(request, RequestOptions.DEFAULT);
        if (response.status() != RestStatus.OK) {
            throw new RuntimeException(response.toString());
        }
    }

    /**
     * 批量插入数据
     *
     * @param index index
     * @param list  带插入列表
     */
    public void insertBatch(String index, List<EsEntity> list) throws Exception {
        BulkRequest request = new BulkRequest();
        list.forEach(item -> request.add(new IndexRequest(index)
                .id(item.getId())
                .source(item.getData(), XContentType.JSON)));
        BulkResponse response = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
        if (response.hasFailures()) {
            throw new RuntimeException(response.buildFailureMessage());
        }
    }

批量插入的使用例子:

List<Product> products = new ArrayList<>();
elasticService.insertBatch(INDEX,
    products.stream().map(EsEntity::objToElasticEntity)
    .collect(Collectors.toList()));

总结

个人更倾向于基于API封装的方式,简单通用。但是要注意,批量插入数据时,不要产生太多的中间对象,造成内存空间浪费。比如从数据库查询出来的结果转成中间对象,又转成Map对象再插入ES

#后端

声明:公众号、CSDN、掘金的曾用名:“Java艺术”,因此您可能看到一些早期的文章的图片有“Java艺术”的水印。

文章推荐

一篇文章说清楚Java的全局异常处理,深入到hotspot源码

本篇将介绍如何使用Java提供的全局异常处理,以及分析一点hotspot虚拟机的源码,让大家了解虚拟机是如何将异常交给全局异常处理器处理的。

使用Docker部署用于学习的ElasticSearch集群

在Linux服务器上使用 Docker安装ElasticSearch集群。

使用Mybatis-Plus提高开发效率

使用`mybatis-plus`可以少写很多常用的`SQL`,通过继承`BaseMapper`使用,还可以动态拼接`SQL`。第一眼看到我还以为是`JPA`。

64位JVM的Java对象头详解

在学习并发编程知识`synchronized`时,我们总是难以理解其实现原理,因为偏向锁、轻量级锁、重量级锁都涉及到对象头,所以了解`java`对象头是我们深入了解`synchronized`的前提条件。

Java锁事之Unsafe、CAS、AQS知识点总结

Unsafe、CAS、AQS是我们了解Java中除synchronized之外的锁必须要掌握的重要知识点。CAS是一个比较和替换的原子操作,AQS的实现强依赖CAS,而在Java中,CAS操作需通过使用Unsafe提供的方法实现。

Dubbo路由功能实现灰度发布及源码分析

灰度发布是实现新旧版本平滑过渡的一种发布方式,即让一部分服务更新到新版本,如果这部分服务没有什么问题,再将其它旧版本的服务更新。而实现简单的灰度发布我们可以使用版本号控制,每次发布都更新版本号,新更新的服务就不会调用旧的服务提供者。