irpas技术客

查es大于10000条数据-滚动查询(scroll)_卡卡东~_es滚动查询

irpas 1693

查es大于10000条数据-滚动查询(scroll) 背景介绍深度分页滚动查询

背景

总所周知,es一般查询只支持最多查询出前1w条数据,很难受。想要一次性查询出你想要的数据,一些大数据的场景下,我们需要用到ElasicSearch的两种查询方式:深度分页或者滚动查询,我们今天使用的是滚动查询方式,因为需要一批次加载全部使用的数据。

介绍 深度分页

使用from和size来查询,操作比较简单,如下:

{ "query": { "match_all": {} }, "from": 9990, "size": 10 } { "query": { "match_all": {} }, "from": 9999, "size": 10 }

我们在获取第9999条到10009条数据的时候,每次需要将前9990、9999条都查出来,然后再向下寻找后10条。如果es还有分片存在,加载的数量就是9990*分片数量,这样查询到以后,还要排序处理,得到10条数据。。。如此一来,搜索得太深,就会造成性能问题,会耗费内存和占用cpu。

其实我们应该避免深度分页操作(限制分页页数),比如最多只能提供100页的展示,从第101页开始就没了,毕竟用户也不会搜的那么深,我们平时搜索淘宝或者百度,一般也就看个10来页就顶多了。

譬如淘宝搜索限制分页最多100页,如下:

滚动查询

通过上面可以指定,from-size不适合做离线大数据的场景,因此我们使用es提供的另一种查询大量数据的方式——滚动查询,也叫游标查询: json处理如下:

#第一次查询: GET /sms/_search?scroll=5m { "size": 20, "query": { "bool": { "must": [ { "match": { "userId": "9d995c0b90fe4128896a1a84eca213bf" } } ] } } } 返回结果: { "_scroll_id": "DnF1ZXJ5VGhlbkZldGNoBgAAAAAATJH1FlFTYzlSZ0VNVGdlM2o0T0dTX2tVUncAAAAAAE0-zBZQUVp6Sy04X1J1NjJCaVZfQUhHWjFnAAAAAABMkfYWUVNjOVJnRU1UZ2UzajRPR1Nfa1VSdwAAAAAATXVxFk83UWRhNGg3UmxTQnpXTEUzd0dreXcAAAAAAEyR9xZRU2M5UmdFTVRnZTNqNE9HU19rVVJ3AAAAAABNPs0WUFFaekstOF9SdTYyQmlWX0FIR1oxZw==", "took": 6, ...... } 把上一次得到的_scroll_id拿到按以下查询即可得到下一轮的数据: GET /_search/scroll/ { "scroll":"1m", "scroll_id":"DnF1ZXJ5VGhlbkZldGNoBgAAAAAATJH1FlFTYzlSZ0VNVGdlM2o0T0dTX2tVUncAAAAAAE0-zBZQUVp6Sy04X1J1NjJCaVZfQUhHWjFnAAAAAABMkfYWUVNjOVJnRU1UZ2UzajRPR1Nfa1VSdwAAAAAATXVxFk83UWRhNGg3UmxTQnpXTEUzd0dreXcAAAAAAEyR9xZRU2M5UmdFTVRnZTNqNE9HU19rVVJ3AAAAAABNPs0WUFFaekstOF9SdTYyQmlWX0FIR1oxZw==" } 这样直到把数据查完为止。

使用json查询只是做一个简单的理解,我们真正用到的还是使用语言去操作它,RestHignLevelClient就是一个很实用的es客户端,接下来我们使用java对其进行操作:导入的pom如下,需要与es版本对应:

<dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.0.0</version> </dependency>

直接上一个对api调用的工具类吧:?????????????????????????????????????????????????????? 其中,获得SearchResponse使用了滚动查询:具体是由以下几个模块组成的:

1、构建searchRequest

Scroll scroll = new Scroll(TimeValue.timeValueMillis(SCROLL_TIMEOUT)); //构建searchRequest SearchRequest request = new SearchRequest(indices); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); if (includes != null) { //构造器加入需要查找的字段 sourceBuilder.fetchSource(includes, null); } //加入query语句 sourceBuilder.query(query); //每次滚动的长度 sourceBuilder.size(SIZE); //加入排序字段 if (orderField != null && !"".equals(orderField.trim())) { sourceBuilder.sort(orderField, order); } //加入scroll和构造器 request.scroll(scroll); request.source(sourceBuilder);

获取返回结果:

SearchResponse searchResponse = client.search(request, RequestOptions.DEFAULT); //拿到第一个ScrollId(游标) String scrollId = searchResponse.getScrollId(); //拿到hits结果 SearchHit[] hits = searchResponse.getHits().getHits(); //保存返回结果List List<T> result = new ArrayList<>(); scrollIdList.add(scrollId);

循环滚动查询—>保存结果:

//滚动查询将SearchHit封装到result中 while (ArrayUtils.isNotEmpty(hits)) { for (SearchHit hit : hits) { //Function<SearchHit, T>, 输入SearchHit,经过操作后,返回T结果 result.add(fun.apply(hit)); } //说明滚动完了,返回结果即可 if (hits.length < SIZE) { break; } //继续滚动,根据上一个游标,得到这次开始查询位置 SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId); searchScrollRequest.scroll(scroll); //得到结果 SearchResponse searchScrollResponse = client.scroll(searchScrollRequest, RequestOptions.DEFAULT); //定位游标 scrollId = searchScrollResponse.getScrollId(); hits = searchScrollResponse.getHits().getHits(); scrollIdList.add(scrollId);

util全部代码如下:

package com.yq.demo.Util; import org.apache.commons.lang3.ArrayUtils; import org.elasticsearch.action.search.ClearScrollRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.Scroll; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.SortOrder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; import java.util.function.Function; public class ESUtil { private static final Logger log = LoggerFactory.getLogger(ESUtil.class); private static final long SCROLL_TIMEOUT = 180000; private static int SIZE = 1000; private static int MAX_BUFFER = 209715200; /** * 构建SearchResponse * * @param client restHighLevelClient * @param indices 索引 * @param query queryBuilder * @param includes 包含的字段 * @param orderField 排序字段 * @param order 排序类型 * @param fun 返回函数 * @param <T> 返回类型 * @return List, 可以使用fun转换为T结果 * @throws Exception */ public static <T> List<T> searchResponse(RestHighLevelClient client, String[] indices, QueryBuilder query, String[] includes, String orderField, SortOrder order, Function<SearchHit, T> fun) throws Exception { //滚动查询的Scroll Scroll scroll = new Scroll(TimeValue.timeValueMillis(SCROLL_TIMEOUT)); //构建searchRequest SearchRequest request = new SearchRequest(indices); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); if (includes != null) { //构造器加入需要查找的字段 sourceBuilder.fetchSource(includes, null); } //加入query语句 sourceBuilder.query(query); //每次滚动的长度 sourceBuilder.size(SIZE); //加入排序字段 if (orderField != null && !"".equals(orderField.trim())) { sourceBuilder.sort(orderField, order); } //加入scroll和构造器 request.scroll(scroll); request.source(sourceBuilder); //存储scroll的list List<String> scrollIdList = new ArrayList<>(); //返回结果 SearchResponse searchResponse = client.search(request, RequestOptions.DEFAULT); //拿到第一个ScrollId(游标) String scrollId = searchResponse.getScrollId(); //拿到hits结果 SearchHit[] hits = searchResponse.getHits().getHits(); //保存返回结果List List<T> result = new ArrayList<>(); scrollIdList.add(scrollId); try { //滚动查询将SearchHit封装到result中 while (ArrayUtils.isNotEmpty(hits)) { for (SearchHit hit : hits) { //Function<SearchHit, T>, 输入SearchHit,经过操作后,返回T结果 result.add(fun.apply(hit)); } //说明滚动完了,返回结果即可 if (hits.length < SIZE) { break; } //继续滚动,根据上一个游标,得到这次开始查询位置 SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId); searchScrollRequest.scroll(scroll); //得到结果 SearchResponse searchScrollResponse = client.scroll(searchScrollRequest, RequestOptions.DEFAULT); //定位游标 scrollId = searchScrollResponse.getScrollId(); hits = searchScrollResponse.getHits().getHits(); scrollIdList.add(scrollId); } } finally { //清理scroll,释放资源 ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); clearScrollRequest.setScrollIds(scrollIdList); client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT); } return result; } /** * 聚合查询的SearchResponse * @param client * @param indices 索引 * @param query QueryBuilder * @param aggregations AggregationBuilder * @return SearchResponse * @throws Exception */ public static SearchResponse searchResponse(RestHighLevelClient client, String[] indices, QueryBuilder query, AggregationBuilder... aggregations) throws Exception { //构建request请求 SearchRequest request = new SearchRequest(indices); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.query(query); //加入Agg if (aggregations != null && aggregations.length > 0) { for (AggregationBuilder aggregation : aggregations) { sourceBuilder.aggregation(aggregation); } } sourceBuilder.size(0); //忽略不可用索引,只用于开放索引 request.indicesOptions(IndicesOptions.lenientExpandOpen()); request.source(sourceBuilder); return client.search(request, RequestOptions.DEFAULT); } }

希望对你有所帮助,Thank you for whatching!!!😆😆😆😆😆😆


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #es滚动查询