阅读文本大概需要3分钟。
Elasticsearch除了可以文档Index操作外,也提供了一次可以操作多个文档Index的API,上一篇已经把单文档的说了,从今天起说一说多文档Index操作。多文档操作的API统称Multi-document APIs
1、 一次性获取多个Index
public static void main(String[] args) throws IOException {
//onstartup
TransportClientclient =getTransportClient();
//继续添加其他地址
MultiGetResponsemultiGetItemResponses = client.prepareMultiGet()
.add("twitter", "tweet", "1","2")
.get();
for(MultiGetItemResponseitemResponse : multiGetItemResponses) {
GetResponse response =itemResponse.getResponse();
if(response.isExists()){
String json =response.getSourceAsString();
System.out.println(json);
}
}
//onshutdown
client.close();
}
运行结果
{"user":"kimchy","postDate":"2013-01-30","message":"tryingout Elasticsearch"}
{"user":"kimchy","postDate":"2013-01-30","message":"tryingout Elasticsearch"}
2、 Bulk API,又称批量API允许在单个请求中索引和删除多个文档
public static void main(String[] args) throws IOException {
//on startup
TransportClient client = getTransportClient();
BulkRequestBuilder bulkRequest = client.prepareBulk();
bulkRequest.add(client.prepareIndex("twitter", "_doc", "3")
.setSource(XContentFactory.jsonBuilder()
.startObject()
.field("user", "zhangsan")
.field("postDate", new Date())
.field("message", "trying out Elasticsearch")
.endObject()
)
);
bulkRequest.add(client.prepareIndex("twitter", "_doc", "4")
.setSource(XContentFactory.jsonBuilder()
.startObject()
.field("user", "lisi")
.field("postDate", new Date())
.field("message", "another post")
.endObject()
)
);
BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
// process failures by iterating through each bulk response item
}
//on shutdown
client.close();
}
通过以上代码就可以插入两条Index,可以通过查询Index的API查询到插入成功。
3、Using Bulk Processor,BulkProcessor提供一个基于请求数量和大小或者某个特定时间之后的自动刷新批处理操作接口
BulkProcessor bulkProcessor = BulkProcessor.builder(
client, //增加elasticsearch客户端
new BulkProcessor.Listener() {
@Override
publicvoid beforeBulk(long executionId,BulkRequest request) {
...
}
//调用bulk之前执行 ,例如你可以通过request.numberOfActions()方法知道numberOfActions
@Override
publicvoid afterBulk(long executionId,BulkRequest request, BulkResponse response)
{
...
}
//调用bulk之后执行 ,例如你可以通过request.hasFailures()方法知道是否执行失败
@Override
publicvoid afterBulk(long executionId,BulkRequest request,Throwable failure) {
...
}// 调用失败抛 Throwable
}).setBulkActions(10000) //每次10000请求
.setBulkSize(new ByteSizeValue(5,ByteSizeUnit.MB)) //拆成5mb一块
.setFlushInterval(TimeValue.timeValueSeconds(5))//无论请求数量多少,每5秒钟请求一次。
.setConcurrentRequests(1) //设置并发请求的数量。值为0意味着只允许执行一个请求。值为1意味着允许1并发请求。
.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100),3))//设置自定义重复请求机制,最开始等待100毫秒,之后成倍更加,重试3次,当一次或多次重复请求失败后因为计算资源不够抛出 EsRejectedExecutionException 异常,可以通过BackoffPolicy.noBackoff()方法关闭重试机制
.build();
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
机房租用,北京机房租用,IDC机房托管, http://www.fwqtg.net
相关推荐: 一文吃透 Vue 框架教程(上)
MVVM架构 双向绑定机制✅作者简介:2022年博客新星 第八。热爱国学的Java后端开发者,修心和技术同步精进。 🍎个人主页:Java Fans的博客 🍊个人信条:不迁怒,不贰过。小知识,大智慧。 💞当前专栏:前端案例分享专栏 ✨特色专栏:国学周更-心性养成之路 🥭本文内容:一文吃透…