背景
在做elasticsearch集群从原来的2.x版本升级到更新版本如6.x过程中,由于需要在原来的应用中,同时连接2.x的集群以及6.x的集群来做在线动态灰度切流量,保证流量平滑切换,有问题可随时回切;一般在应用侧比较常规的做法是使用elasticsearch提供rest的sdk:Java High Level REST Client,可以最大程度的保证多版本的兼容性;但在切换过程中也遇到了比较一些兼容性的问题,今天主要来讨论一下关于聚合结果处理的问题;
直方图统计2.x集群返回结果无法解决的问题
在应用中有一个根据价格区间的直方图统计服务,统计查询语句如下:
//POST /my-index/my-type/_search
{
"from": 0,
"size": 0,
"timeout": "10000ms",
"aggregations": {
"statPrice": {
"histogram": {
"field": "price",
"interval": 10000,
"min_doc_count": 1
}
}
}
}
正常的出参数据如下:
{
"_shards":{
"total":16,
"failed":0,
"successful":16,
"skipped":0
},
"hits":{
"hits":[
],
"total":243399987,
"max_score":0
},
"took":539,
"timed_out":false,
"aggregations":{
"statPrice":{
"buckets":[
{
"doc_count":123,
"key":0.0
},
{
"doc_count":345,
"key":1000.0
},
{
"doc_count":780,
"key":2000.0
}
]
}
}
}
在应用中通过rest api调用,6.x集群可以正常的获取数据,没有任务问题,这里因为我们使用的elasticsearch-rest-high-level-client
的版本就是6.x的,这个跟es服务的版本是一样;但是在2.x的老集群上调用时,返回异常;提示Could not parse aggregation keyed as [statPrice]
,详细异常如下
Caused by: org.elasticsearch.common.ParsingException: Could not parse aggregation keyed as [statPrice]
at org.elasticsearch.search.aggregations.Aggregations.fromXContent(Aggregations.java:146) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.action.search.SearchResponse.innerFromXContent(SearchResponse.java:289) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.action.search.SearchResponse服务器托管网.fromXContent(SearchResponse.java:248) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653) ~[elasticsearch-rest-high-level-client-6.3.2.jar:6.3.2]
at org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAndParseEntity$2(RestHighLevelClient.java:508) ~[elasticsearch-rest-high-level-client-6.3.2.jar:6.3.2]
at org.elasticsearch.client.RestHighLevelClient.pe服务器托管网rformRequest(RestHighLevelClient.java:539) ~[elasticsearch-rest-high-level-client-6.3.2.jar:6.3.2]
at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:508) ~[elasticsearch-rest-high-level-client-6.3.2.jar:6.3.2]
at org.elasticsearch.client.RestHighLevelClient.search(RestHighLevelClient.java:404) ~[elasticsearch-rest-high-level-client-6.3.2.jar:6.3.2]
at com.jd.b2b.ware.admin.compare.core.support.IncrementSyncHandler.statCount(IncrementSyncHandler.java:133) ~[b2b-ware-admin-compare-0.0.1-SNAPSHOT.jar:?]
... 48 more
类:org.elasticsearch.search.aggregations.Aggregations.fromXContent的部分代码
public static Aggregations fromXContent(XContentParser parser) throws IOException {
final List aggregations = new ArrayList();
XContentParser.Token token;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.START_OBJECT) {
SetOnce typedAgg = new SetOnce();
String currentField = parser.currentName();
// TYPED_KEYS_DELIMITER为#, 使用 typed_keys 参数在聚合名称前面加上聚合名称类型前缀时使用的分隔符
parseTypedKeysObject(parser, Aggregation.TYPED_KEYS_DELIMITER, Aggregation.class, typedAgg::set);
if (typedAgg.get() != null) {
aggregations.add(typedAgg.get());
} else {
throw new ParsingException(parser.getTokenLocation(),
String.format(Locale.ROOT, "Could not parse aggregation keyed as [%s]", currentField));
}
}
}
return new Aggregations(aggregations);
}
通过对报错处的代码debug以及分析,这里会对结果的聚合名称根据#号,取到对应的聚合类型的名称;先是翻看了官方文档,了解了一下typed_keys
Aggregations 参数的说明;该参数就是指定返回聚合类型;官方文档最早出现是在7.x的版本,但我们在6.x的集群上同样支持该参数;并且增加该参数与比2.x,出参结果除了在集合名称中有聚合类型外,其他都是一样的;
POST /my-index/my-type/_search?typed_keys (或则typed_keys=true 都可以)
这里又仔细回去检查了我们自己的代码,并没有设置typed_keys
参数;本着刨根问底的精神,进行了详细debug,在org.elasticsearch.client.Request#search
找到了该参数的设置;这个参数是固定给入的,并且没有参数可以进行覆盖修改;
static Request search(SearchRequest searchRequest) throws IOException {
String endpoint = endpoint(searchRequest.indices(), searchRequest.types(), "_search");
Params params = Request.Params.builder();
params.putParam("typed_keys", "true");
params.withRouting(searchRequest.routing());
........
到此问题原因基本以找到,解决方案就比较如搞;
方案一:使用LowLevelClient
使用LowLevelClient
应该是解决这类兼容性问题最通用的办法,因为这个LowLevelClient
就是个httpClient,通过给服务端送查询DSL,再获取到出参字符串,基本简单的json字符串解析即可;代码片段如下:
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.from(0).size(0).timeout(TimeValue.timeValueMillis(10000L));
//数据聚合
searchSourceBuilder.query(queryBuilder);
AggregationBuilder aggregationBuilder = AggregationBuilders
.histogram("statPrice")
.field("price")
.interval(1000)
.minDocCount(1);
searchSourceBuilder.aggregation(aggregationBuilder);
log.info("入参:{}", searchSourceBuilder.toString());
//从highLevelClient中获取到LowLevelClient
RestClient client = highLevelClient.getLowLevelClient();
Map params = new HashMap();
HttpEntity entity = new NStringEntity(searchSourceBuilder.toString(), ContentType.APPLICATION_JSON);
Response response = client.performRequest("POST","/my-index/my-type/_search",
params, entity);
String responseBody = EntityUtils.toString(response.getEntity());
//后续的业务处理
方案二:聚合名称前缀聚合类型
针对这个场景,还有一个比较简便的办法;在访问2.x集群时,查询的DSL语句的聚合名字前加上他们需要的前缀即可;查询入参的原来聚合名称是statPrice
,现在改成histogram#statPrice
即可;
// 2.x集群
//POST /my-index/my-type/_search
{
"from": 0,
"size": 0,
"timeout": "10000ms",
"aggregations": {
"histogram#statPrice": {
"histogram": {
"field": "price",
"interval": 10000,
"min_doc_count": 1
}
}
}
}
后续的处理就跟6.x集群一样的,也是通过statPrice
去取聚合的结果;具体还有哪么前缀可以参考org.elasticsearch.client.RestHighLevelClient#getDefaultNamedXContents
这里的注册信息,可以在6.x等更高的版本中加入typed_keys=true
参数,执行一下查询,查看返回的前缘是什么,即可;
参考
- [1] How do you convert an Elasticsearch JSON String Response, with an Aggregation, to an Elasticsearch SearchResponse Object
- [2] 官方 Aggregations 文档
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
机房租用,北京机房租用,IDC机房托管, http://www.fwqtg.net