• A+

elasticsearch(六)java 使用批量查询multiGet介绍及使用

BulkRequest是用来进行批量索引、更新、删除操作的请求对象,前面已经介绍过。

本节介绍下用来进行批量查询的操作: Mult-Get Request

1,首先创建一个主查询请求对象:
MultiGetRequest request = new MultiGetRequest();
然后依次使用主请求对象的add方法,将子查询对象加入到主查询中

request.add(new MultiGetRequest.Item("index", "type", "another_id"));
2,可以分别针对每一个子查询进行如下设置:
1)查询的文档内容不返回:.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE)

request.add(new MultiGetRequest.Item("posts", "doc", "2").fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE));
2) 指定查询哪些字段内容 或 过滤掉哪些字段:

String[] includes = new String[] {"user", "*r"};
String[] excludes = Strings.EMPTY_ARRAY;
FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
request.add(new MultiGetRequest.Item("posts2", "doc", "3").fetchSourceContext(fetchSourceContext));
3)分别指定查询的路由分片和版本等:

request.add(new MultiGetRequest.Item("posts", "doc", "with_routing")
.routing("some_routing"));
request.add(new MultiGetRequest.Item("index", "type", "with_parent")
.parent("some_parent"));
request.add(new MultiGetRequest.Item("index", "type", "with_version")
.versionType(VersionType.EXTERNAL)
.version(10123L));
注:以上设置无法在主请求中设置

3,对主请求的设置
preference, realtime and refresh 需要在主请求里设置,子请求中无法设置这些值
request.preference("some_preference");
// realtime的值默认为true
request.realtime(false);
request.refresh(true);
4,执行请求并获取结果:
MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT);
5,对结果的处理及说明:
1)可以指定处理某条查询:

MultiGetItemResponse firstItem = response.getResponses()[0];
2)遍历查询的结果:

for(MultiGetItemResponse item: response.getResponses()) {
String index = item.getIndex();
String type = item.getType();
String id = item.getId();
System.out.println("第" + ++count + "条-》index:" + index + "; type:" + type + "; id:" + id);
if(item.getFailure() != null) {
Exception e = item.getFailure().getFailure();
ElasticsearchException ee = (ElasticsearchException) e;
if(ee.getMessage().contains("reason=no such index")) {
System.out.println("查询的文档库不存在!");
}
}

GetResponse getResponse = item.getResponse();

if (getResponse.isExists()) {
long version = getResponse.getVersion();
String sourceAsString = getResponse.getSourceAsString();
System.out.println("查询的结果为:");
System.out.println(sourceAsString);
Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();
byte[] sourceAsBytes = getResponse.getSourceAsBytes();
} else {
System.out.println("没有查询到相应文档");
}
}
特殊情况说明:

1)查询的index索引库不存在时,则返回结果的failure参数不为null,含报错信息,

GetResponse getResponse = item.getResponse() 得到的getResponse值为null
故为了后面数据抛出空指针异常,要做兼容处理:

if(item.getFailure() != null) {
Exception e = item.getFailure().getFailure();
ElasticsearchException ee = (ElasticsearchException) e;
if(ee.getMessage().contains("reason=no such index")) {
System.out.println("查询的文档库不存在!");
}
}
2)如果文档类型type或id不存在,getResponse.isExists() 的结果是false

6,完整代码示例:
package com.example.elasticsearch.document;

import org.apache.http.HttpHost;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;

import java.util.Map;

/**
* Created with IntelliJ IDEA.
*
* @Author: Weichang Zhong
* @Date: 2018/11/8
* @Time: 17:20
* @Description:
*/
public class SynMultiGetRequest {
public static void main(String[] args) {
try (RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("127.0.0.1", 9200, "http")
)
)) {
// 创建查询父对象
MultiGetRequest request = new MultiGetRequest();
// 添加子查询
request.add(new MultiGetRequest.Item("posts", "doc", "1"));
request.add(new MultiGetRequest.Item("posts", "doc2", "2").fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE));
String[] includes = new String[] {"user", "*r"};
String[] excludes = Strings.EMPTY_ARRAY;
FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
request.add(new MultiGetRequest.Item("posts2", "doc", "3").fetchSourceContext(fetchSourceContext));
// // user必须在map集合中
// request.add(new MultiGetRequest.Item("posts", "doc", "4")
// .storedFields("user"));
// MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT);
// MultiGetItemResponse item = response.getResponses()[0];
// // user必须在map集合中
// String value = item.getResponse().getField("user").getValue();

// 针对每个子请求分别设置,无法在主请求中设置
// 指定去哪个分片上查询,如何指定分片上没有,不会再去其它分片查询,如果不指定,则依次轮询各个分片查询
request.add(new MultiGetRequest.Item("posts", "doc", "with_routing")
.routing("some_routing"));
request.add(new MultiGetRequest.Item("index", "type", "with_parent")
.parent("some_parent"));
request.add(new MultiGetRequest.Item("index", "type", "with_version")
.versionType(VersionType.EXTERNAL)
.version(10123L));
// preference, realtime and refresh 需要在主请求里设置,子请求中无法设置
request.preference("some_preference");
// realtime的值默认为true
request.realtime(false);
request.refresh(true);
MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT);
int count = 0;
for(MultiGetItemResponse item: response.getResponses()) {
String index = item.getIndex();
String type = item.getType();
String id = item.getId();
System.out.println("第" + ++count + "条-》index:" + index + "; type:" + type + "; id:" + id);
if(item.getFailure() != null) {
Exception e = item.getFailure().getFailure();
ElasticsearchException ee = (ElasticsearchException) e;
if(ee.getMessage().contains("reason=no such index")) {
System.out.println("查询的文档库不存在!");
}
}

GetResponse getResponse = item.getResponse();

if (getResponse.isExists()) {
long version = getResponse.getVersion();
String sourceAsString = getResponse.getSourceAsString();
System.out.println("查询的结果为:");
System.out.println(sourceAsString);
Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();
byte[] sourceAsBytes = getResponse.getSourceAsBytes();
} else {
System.out.println("没有查询到相应文档");
}
}
}catch (Exception e) {
e.printStackTrace();
}
}
}
另外:同时也可以查询map集合中的对象,前提是对应的map集合中要存在相应的文档:

// user必须在map集合中
request.add(new MultiGetRequest.Item("posts", "doc", "4").storedFields("user"));
MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT);
MultiGetItemResponse item = response.getResponses()[0];
// user必须在map集合中
String value = item.getResponse().getField("user").getValue();

 

所属分类:产品

全部评论: 0

    我有话说:
    ×