• A+

es-05-获取 resthighlevelclient及api

在6.x以前, 使用最多的是transportclient, 但在7.x会被废弃, 

先说以前的创建方式: 

具体可见: https://www.cnblogs.com/wenbronk/p/6383194.html

复制代码
    /**
     * 获取连接, 第一种方式
     * @throws Exception
     */
//    @Before
    public void before() throws Exception {
        Map<String, String> map = new HashMap<String, String>();  
        map.put("cluster.name", "elasticsearch_wenbronk");  
        Settings.Builder settings = Settings.builder().put(map);  
        client = TransportClient.builder().settings(settings).build()  
                        .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("www.wenbronk.com"), Integer.parseInt("9300"))); 
    }
    
    /**
     * 获取连接, 第二种方式
     * @throws Exception
     */
    @Before
    public void before11() throws Exception {
        // 创建客户端, 使用的默认集群名, "elasticSearch"
//        client = TransportClient.builder().build()
//                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("www.wenbronk.com"), 9300));

        // 通过setting对象指定集群配置信息, 配置的集群名
        Settings settings = Settings.settingsBuilder().put("cluster.name", "elasticsearch_wenbronk") // 设置集群名
//                .put("client.transport.sniff", true) // 开启嗅探 , 开启后会一直连接不上, 原因未知
//                .put("network.host", "192.168.50.37")
                .put("client.transport.ignore_cluster_name", true) // 忽略集群名字验证, 打开后集群名字不对也能连接上
//                .put("client.transport.nodes_sampler_interval", 5) //报错,
//                .put("client.transport.ping_timeout", 5) // 报错, ping等待时间,
                .build();
         client = TransportClient.builder().settings(settings).build()
                 .addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress("192.168.50.37", 9300)));
         // 默认5s
         // 多久打开连接, 默认5s
         System.out.println("success connect");
    }
复制代码

 

最新的创建方式: 

复制代码
package com.wenbronk.elasticsearch.usage.highLevel;

import com.google.common.collect.Lists;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;

public class RestHighLevelClientParent {

    public static final String HOST = "10.124.147.22,10.124.147.23,10.124.147.32";
    public static final Integer PORT = 9200;

    protected RestHighLevelClient client;


    @BeforeEach
    public void testBefore() {

        ArrayList<HttpHost> hosts = Lists.newArrayList();

        Arrays.stream(HOST.split(",")).forEach(host -> {
            hosts.add(new HttpHost(host, PORT, "http"));
        });

        client = new RestHighLevelClient(RestClient.builder(hosts.toArray(new HttpHost[0])));

    }

    @AfterEach
    public void testAfter() throws IOException {
        client.close();
    }

}
复制代码

 

新的客户端, 对异步的处理方式和以前基本相同

1), index

复制代码
@Test
    public void testAsync() throws IOException, InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        XContentBuilder object = XContentFactory.jsonBuilder()
                .startObject()
                .field("user", "wenbronk")
                .timeField("postData", new Date())
                .field("message", "message format from xcontent")
                .endObject();
        IndexRequest source = new IndexRequest("test", "doc", "3").source(object);
        client.indexAsync(source, new ActionListener<IndexResponse>() {
            @Override
            public void onResponse(IndexResponse indexResponse) {
                String id = indexResponse.getId();
                String index = indexResponse.getIndex();
                String type = indexResponse.getType();

                if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {

                }
                if (indexResponse.getResult() == DocWriteResponse.Result.DELETED) {

                }
                if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {

                }

                ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
                if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
                    // 有失败的
                }
                // 对失败的进行处理
                if (shardInfo.getFailed() != 0) {
                    for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
                        String reason = failure.reason();
                    }
                }
                countDownLatch.countDown();
            }

            @Override
            public void onFailure(Exception e) {
                if (e instanceof ElasticsearchException) {
                    ElasticsearchException e1 = (ElasticsearchException) e;
                    if (e1.status() == RestStatus.CONFLICT) {
                        System.out.println("版本冲突 ");
                    }
                }
            }
        });
        countDownLatch.await();
    }
复制代码

2), get

复制代码
@Test
    public void testGet() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        // index, type, id
        GetRequest request = new GetRequest("test", "doc", "1");

        // 可选的添加参数
        // disabled _source retrieval
//        request.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE);

        // Configure source inclusion for specific fields
//        String[] includes = new String[]{"message", "*Data"};
//        String[] excludes = Strings.EMPTY_ARRAY;
//        FetchSourceContext fetchSourceContext =
//                new FetchSourceContext(true, includes, excludes);
//        request.fetchSourceContext(fetchSourceContext);

        // Configure source exclusion for specific fields
//        request.storedFields("message");

        request.routing("routing");
        request.parent("parent");
        request.preference("preference");
        request.realtime(false);
        request.refresh(true);
        request.version(2);
        request.versionType(VersionType.EXTERNAL);

        // 对response处理
        client.getAsync(request, new ActionListener<GetResponse>() {
            @Override
            public void onResponse(GetResponse getResponse) {
                String type = getResponse.getType();
                String index = getResponse.getIndex();
                String id = getResponse.getId();
                Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();
                sourceAsMap.entrySet().forEach(entry -> System.out.println(entry.getKey() + " : " + entry.getValue()));

                countDownLatch.countDown();
            }

            @Override
            public void onFailure(Exception e) {
                // 可能的异常, 1, 不存在, 2 conflict
                if (e instanceof ElasticsearchException) {
                    if (((ElasticsearchException) e).status() == RestStatus.NOT_FOUND) {
                        System.out.println("要找的id不存在");
                    }

                    if (((ElasticsearchException) e).status() == RestStatus.CONFLICT) {
                        System.out.println("conflict ");
                    }
                }
                countDownLatch.countDown();
            }
        });
        countDownLatch.await();
    }
复制代码

3), update

复制代码
@Test
    public void testResponse() throws IOException {
        XContentBuilder builder = XContentFactory.jsonBuilder()
                .startObject()
                .timeField("updated", new Date())
                .field("reason", "daily update")
                .endObject();
        UpdateRequest request = new UpdateRequest("posts", "doc", "1")
                .doc(builder);

        client.updateAsync(request, new ActionListener<UpdateResponse>() {
            @Override
            public void onResponse(UpdateResponse updateResponse) {
                String index = updateResponse.getIndex();
                String type = updateResponse.getType();
                String id = updateResponse.getId();
                long version = updateResponse.getVersion();
                if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {

                } else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {

                } else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {

                } else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {

                }

                GetResult result = updateResponse.getGetResult();
                if (result.isExists()) {
                    String sourceAsString = result.sourceAsString();
                    Map<String, Object> sourceAsMap = result.sourceAsMap();
                    byte[] sourceAsBytes = result.source();
                } else {

                }

                ReplicationResponse.ShardInfo shardInfo = updateResponse.getShardInfo();
                if (shardInfo.getTotal() != shardInfo.getSuccessful()) {

                }
                if (shardInfo.getFailed() > 0) {
                    for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
                        String reason = failure.reason();
                    }
                }
            }

            @Override
            public void onFailure(Exception e) {

            }
        });
    }
复制代码

4), delete

复制代码
@Test
    public void testDelete() {
        DeleteRequest deleteRequest = new DeleteRequest("test", "doc", "3");

        client.deleteAsync(deleteRequest, new ActionListener<DeleteResponse>() {
            @Override
            public void onResponse(DeleteResponse deleteResponse) {
                String index = deleteResponse.getIndex();
                String type = deleteResponse.getType();
                String id = deleteResponse.getId();
                long version = deleteResponse.getVersion();
                ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
                if (shardInfo.getTotal() != shardInfo.getSuccessful()) {

                }
                if (shardInfo.getFailed() > 0) {
                    for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
                        String reason = failure.reason();
                    }
                }
            }

            @Override
            public void onFailure(Exception e) {
                // 可能的异常, 1, 不存在, 2 conflict
                if (e instanceof ElasticsearchException) {
                    if (((ElasticsearchException) e).status() == RestStatus.NOT_FOUND) {
                        System.out.println("id不存在");
                    }
                    if (((ElasticsearchException) e).status() == RestStatus.CONFLICT) {
                        System.out.println("conflict ");
                    }
                }
            }
        });

    }
复制代码

5), bulk

复制代码
 @Test
    public void testBulk() {
        BulkRequest request = new BulkRequest();
        request.add(new DeleteRequest("posts", "doc", "3"));
        request.add(new UpdateRequest("posts", "doc", "2")
                .doc(XContentType.JSON,"other", "test"));
        request.add(new IndexRequest("posts", "doc", "4")
                .source(XContentType.JSON,"field", "baz"));

        // 可选的参数
        request.timeout(TimeValue.timeValueMinutes(2));
        request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
        request.waitForActiveShards(2);

        // 异步处理
        client.bulkAsync(request, new ActionListener<BulkResponse>() {
            @Override
            public void onResponse(BulkResponse bulkItemResponses) {
                for (BulkItemResponse bulkItemResponse : bulkItemResponses) {
                    DocWriteResponse itemResponse = bulkItemResponse.getResponse();

                    if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
                        IndexResponse indexResponse = (IndexResponse) itemResponse;

                    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
                        UpdateResponse updateResponse = (UpdateResponse) itemResponse;

                    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
                        DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
                    }
                    // 失败的返回
                    if (bulkItemResponse.isFailed()) {
                        BulkItemResponse.Failure failure = bulkItemResponse.getFailure();

                    }
                }
            }

            @Override
            public void onFailure(Exception e) {

            }
        });
    }
复制代码

6), bulkprocess

复制代码
@Test
    public void testBulkProcess() throws InterruptedException {
        // create bulkprocessor
        BulkProcessor.Builder builder = BulkProcessor.builder(client::bulkAsync, new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId, BulkRequest request) {
                int numberOfActions = request.numberOfActions();
                logger.debug("Executing bulk [{}] with {} requests",
                        executionId, numberOfActions);
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                if (response.hasFailures()) {
                    logger.warn("Bulk [{}] executed with failures", executionId);
                } else {
                    logger.debug("Bulk [{}] completed in {} milliseconds",
                            executionId, response.getTook().getMillis());
                }
            }

            // when failure, will be called
            @Override
            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                logger.error("Failed to execute bulk", failure);
            }
        });

        // 添加参数
        builder.setBulkActions(500);        // 刷新时间
        builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB));        // 刷新长度
        builder.setConcurrentRequests(0);       // 并发度
        builder.setFlushInterval(TimeValue.timeValueSeconds(10L));      // 刷新周期
        builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));

        BulkProcessor bulkProcessor = builder.build();

        // 添加批量执行数据
        IndexRequest one = new IndexRequest("posts", "doc", "1").
                source(XContentType.JSON, "title", "In which order are my Elasticsearch queries executed?");
        IndexRequest two = new IndexRequest("posts", "doc", "2")
                .source(XContentType.JSON, "title", "Current status and upcoming changes in Elasticsearch");
        IndexRequest three = new IndexRequest("posts", "doc", "3")
                .source(XContentType.JSON, "title", "The Future of Federated Search in Elasticsearch");

        bulkProcessor.add(one);
        bulkProcessor.add(two);
        bulkProcessor.add(three);

        boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
        bulkProcessor.close();
    }
复制代码

7), multiget

复制代码
@Test
    public void testMultiGet() {
        MultiGetRequest request = new MultiGetRequest();
        request.add(new MultiGetRequest.Item("index", "type", "example_id"));
        request.add(new MultiGetRequest.Item("index", "type", "another_id"));

        client.multiGetAsync(request, new ActionListener<MultiGetResponse>() {
            @Override
            public void onResponse(MultiGetResponse multiGetItemResponses) {
                MultiGetItemResponse firstItem = multiGetItemResponses.getResponses()[0];
                assertNull(firstItem.getFailure());
                GetResponse firstGet = firstItem.getResponse();
                String index = firstItem.getIndex();
                String type = firstItem.getType();
                String id = firstItem.getId();
                if (firstGet.isExists()) {
                    long version = firstGet.getVersion();
                    String sourceAsString = firstGet.getSourceAsString();
                    Map<String, Object> sourceAsMap = firstGet.getSourceAsMap();
                    byte[] sourceAsBytes = firstGet.getSourceAsBytes();
                } else {

                }
            }

            @Override
            public void onFailure(Exception e) {

            }
        });

    }
复制代码

 

 

所属分类:产品

全部评论: 0

    我有话说:
    ×