如何在Java
项目中使用elasticsearch-rest-high-level-client
。
直接http接口调用
实际上使用kibana
或者elasticsearch-rest-high-level-client
最终也是发送http
请求。不同于redis
这类服务,需要去了解它的通信协议,再通过Socket
编程去实现通信,因此都是直接使用别人封装好的API
。而ES
提供了RESTFUL
接口,就不需要我们去了解协议,因此,最简单的方式就是直接构造请求body
发送http
请求访问ES
。
String esUrl = String.format("%s/%s/_search",elasticsearchConfig.getClusterNodes(),INDEX);
// 发送http请求
String responseStr = HttpUtil.requestPostWithJson(esUrl,searchBuilder.toString());
// 对响应结果responseStr进行解析
这里还是会用到ES
的API
,使用SearchBuilder
构造请求体。当然也可以不使用,自己实现。
SearchSourceBuilder searchBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery()
.must(....);
searchBuilder.query(boolQueryBuilder);
但是构造请求body
也是很繁琐的事情,因此一般会选择使用封装的API
。
基于API封装便用方式
添加elasticsearch-rest-high-level-client
依赖。添加依赖时需要排除elasticsearch
、elasticsearch-rest-client
包的依赖,因为默认是依赖低版本的,这里有个坑。排除之后再自己添加对应版本的elasticsearch
、elasticsearch-rest-client
包的依赖就行了。(项目中用的是maven
,我还是喜欢用gradle
)。
<!-- es -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${elasticsearch.version}</version>
<exclusions>
<!-- 默认引入的低版本 所以要排除重新依赖 -->
<exclusion>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</exclusion>
<exclusion>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 重新依赖 -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
接着在application.yml
配置文件中添加es
的配置,名称自己取。
# 要求ES 7.x版本
es:
host: 127.0.0.1
port: 9400
scheme: http
读取配置信息:
@Component
@ConfigurationProperties(prefix = "es")
public class ElasticSearchPropertys {
private String host;
private int port;
private String scheme;
}
然后根据配置创建一个RestHighLevelClient
注入到spring
容器。
@Configuration
public class ElasticSearchConfig {
@Resource
private ElasticSearchPropertys elasticSearchPropertys;
@Bean
public RestHighLevelClient restHighLevelClient() {
RestClientBuilder restClientBuilder = RestClient.builder(
new HttpHost(elasticSearchPropertys.getHost(),
elasticSearchPropertys.getPort(), elasticSearchPropertys.getScheme()));
return new RestHighLevelClient(restClientBuilder);
}
}
如果是集群,自己改下就好了,RestClient
的builder
方法是支持传多个HttpHost
的。
然后就可以愉快的使用RestHighLevelClient
提供的API
实现CURD
操作了。为了便于使用,可以基于RestHighLevelClient
再封装一层。
/**
* 封装ES通用API
*
* @author wujiuye
* @date 2020/03/04
*/
@Component
@ConditionalOnBean(RestHighLevelClient.class)
public class ElasticSearchService {
@Resource
RestHighLevelClient restHighLevelClient;
/**
* 判断某个index是否存在
*
* @param index index名
*/
public boolean existIndex(String index) throws Exception {
}
/**
* 创建索引(仅测试使用)
*
* @param index 索引名称
* @param mappings 索引描述
* @param shards 分片数
* @param replicas 副本数
*/
public void createIndex(String index, EsIndexMappings mappings, int shards, int replicas) throws Exception {
}
/**
* 插入或更新单条记录
*
* @param index index
* @param entity 对象
*/
public void insertOrUpdate(String index, EsEntity entity) throws Exception {
}
/**
* 批量插入数据
*
* @param index index
* @param list 带插入列表
*/
public void insertBatch(String index, List<EsEntity> list) throws Exception {
}
/**
* 搜索
*
* @param index index
* @param builder 查询参数
* @param c 结果对象类型
*/
public <T> List<T> search(String index, SearchSourceBuilder builder, Class<T> c) throws Exception {
}
.......
}
在开发过程中,我们需要本地测试,或者连接测试环境的ES
进行测试。为了方便,我将在创建索引的动作写在代码中,当判断环境为dev
环境时,删除索引重建。因此,我也封装了创建索引的逻辑。
首先是定义一个注解,用于注释在实体类的字段上,用于创建索引时构造mapping
。如果需要更多信息,添加到EsField
注解,并完善解析逻辑就可以了。
/**
* ES索引字段映射,用于代码创建索引 (仅测试使用)
*
* @author wujiuye
* @date 2020/03/04
*/
@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface EsField {
/**
* 字段类型
*
* @return
*/
String type() default "text";
}
例如:
public class Product {
/**
* 商品id
*/
@EsField(type = "integer")
private Integer leId;
/**
* 品牌id
*/
@EsField(type = "integer")
private Integer brandId;
/**
* 品牌名称
*/
private String brandName;
/**
* 日期
*/
private String date;
}
实现注解EsField
的解析,生成EsIndexMappings
对象。
/**
* 用于代码创建索引(仅测试使用)
*
* @author wujiuye
* @date 2020/03/04
*/
public class EsIndexMappings {
private boolean dynamic = false;
private Map<String, Map<String, Object>> properties;
/**
* 生成索引字段映射信息
*
* @param dynamic
* @param type
* @return
*/
public static EsIndexMappings byType(boolean dynamic, Class<?> type) {
EsIndexMappings esIndexMappings = new EsIndexMappings();
esIndexMappings.setDynamic(dynamic);
esIndexMappings.setProperties(new HashMap<>());
Field[] fields = type.getDeclaredFields();
for (Field field : fields) {
Map<String, Object> value = new HashMap<>();
EsField esField = field.getAnnotation(EsField.class);
if (esField == null) {
value.put("type", "text");
value.put("index", true);
} else {
value.put("type", esField.type());
value.put("index", esField.index());
}
esIndexMappings.getProperties().put(field.getName(), value);
}
return esIndexMappings;
}
}
创建索引方法的实现:
/**
* 创建索引(仅测试使用)
*
* @param index 索引名称
* @param mappings 索引描述
* @param shards 分片数
* @param replicas 副本数
*/
public void createIndex(String index, EsIndexMappings mappings, int shards, int replicas) throws Exception {
if (this.existIndex(index)) {
return;
}
CreateIndexRequest request = new CreateIndexRequest(index);
request.settings(Settings.builder()
// 分片数
.put("index.number_of_shards", shards)
// 副本数
.put("index.number_of_replicas", replicas));
// 指定mappings
request.mapping(JSON.toJSONString(mappings), XContentType.JSON);
CreateIndexResponse res = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
if (!res.isAcknowledged()) {
throw new RuntimeException("所以创建失败!");
}
}
使用例子:
elasticService.createIndex(INDEX, EsIndexMappings.byType(false, Product.class), 1, 1);
在插入对象时,我们可能会有指定文档id
的需求,因此,为了封装更通用的插入和批量插入方法,需要抽象一个中间对象EsEntity
。
public class EsEntity {
/**
* 索引的_id,不指定则使用es自动生成的
*/
private String id;
/**
* 不转中间对象,直接转为json字符串,避免批量插入浪费内存资源
*/
private String jsonData;
}
提供将任意对象转为EsEntity
的静态方法,支持指定id
和不指定id
,当不指定id
时,ES
会自动生成。
/**
* 将任意类型对象转为EsEntity
* 不指定_id
*
* @param obj 一个文档(记录)
* @param <T>
* @return
*/
public static <T> EsEntity objToElasticEntity(T obj) {
return objToElasticEntity(null, obj);
}
/**
* 将任意类型对象转为EsEntity
*
* @param id null:不指定_id,非null:指定_id
* @param obj 一个文档(记录)
* @param <T>
* @return
*/
public static <T> EsEntity objToElasticEntity(Integer id, T obj) {
EsEntity elasticEntity = new EsEntity();
String data = JSON.toJSONString(obj);
elasticEntity.setId(id == null ? null : String.valueOf(id));
elasticEntity.setData(data);
return elasticEntity;
}
插入和批量插入的实现:
/**
* 插入或更新单条记录
*
* @param index index
* @param entity 对象
*/
public void insertOrUpdate(String index, EsEntity entity) throws Exception {
IndexRequest request = new IndexRequest(index);
request.id(entity.getId());
request.source(entity.getData(), XContentType.JSON);
IndexResponse response = restHighLevelClient.index(request, RequestOptions.DEFAULT);
if (response.status() != RestStatus.OK) {
throw new RuntimeException(response.toString());
}
}
/**
* 批量插入数据
*
* @param index index
* @param list 带插入列表
*/
public void insertBatch(String index, List<EsEntity> list) throws Exception {
BulkRequest request = new BulkRequest();
list.forEach(item -> request.add(new IndexRequest(index)
.id(item.getId())
.source(item.getData(), XContentType.JSON)));
BulkResponse response = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
if (response.hasFailures()) {
throw new RuntimeException(response.buildFailureMessage());
}
}
批量插入的使用例子:
List<Product> products = new ArrayList<>();
elasticService.insertBatch(INDEX,
products.stream().map(EsEntity::objToElasticEntity)
.collect(Collectors.toList()));
总结
个人更倾向于基于API
封装的方式,简单通用。但是要注意,批量插入数据时,不要产生太多的中间对象,造成内存空间浪费。比如从数据库查询出来的结果转成中间对象,又转成Map
对象再插入ES
。