|
@@ -32,6 +32,7 @@ import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.function.BiConsumer;
|
|
|
+import java.util.function.BiFunction;
|
|
|
import java.util.function.Function;
|
|
|
import java.util.regex.Pattern;
|
|
|
import java.util.stream.Collectors;
|
|
@@ -180,6 +181,10 @@ public class FessEsClient implements Client {
|
|
|
|
|
|
protected String scrollForDelete = "1m";
|
|
|
|
|
|
+ protected int sizeForUpdate = 100;
|
|
|
+
|
|
|
+ protected String scrollForUpdate = "1m";
|
|
|
+
|
|
|
protected int maxConfigSyncStatusRetry = 10;
|
|
|
|
|
|
protected int maxEsStatusRetry = 60;
|
|
@@ -643,6 +648,54 @@ public class FessEsClient implements Client {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ public long updateByQuery(final String index, final Function<SearchRequestBuilder, SearchRequestBuilder> option,
|
|
|
+ final BiFunction<UpdateRequestBuilder, SearchHit, UpdateRequestBuilder> builder) {
|
|
|
+
|
|
|
+ final FessConfig fessConfig = ComponentUtil.getFessConfig();
|
|
|
+ SearchResponse response =
|
|
|
+ option.apply(
|
|
|
+ client.prepareSearch(index).setScroll(scrollForUpdate).setSize(sizeForUpdate)
|
|
|
+ .setPreference(Constants.SEARCH_PREFERENCE_LOCAL)).execute()
|
|
|
+ .actionGet(fessConfig.getIndexScrollSearchTimeout());
|
|
|
+
|
|
|
+ int count = 0;
|
|
|
+ String scrollId = response.getScrollId();
|
|
|
+ try {
|
|
|
+ while (scrollId != null) {
|
|
|
+ final SearchHits searchHits = response.getHits();
|
|
|
+ final SearchHit[] hits = searchHits.getHits();
|
|
|
+ if (hits.length == 0) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ final BulkRequestBuilder bulkRequest = client.prepareBulk();
|
|
|
+ for (final SearchHit hit : hits) {
|
|
|
+ final UpdateRequestBuilder requestBuilder =
|
|
|
+ builder.apply(client.prepareUpdate().setIndex(index).setId(hit.getId()), hit);
|
|
|
+ if (requestBuilder != null) {
|
|
|
+ bulkRequest.add(requestBuilder);
|
|
|
+ }
|
|
|
+ count++;
|
|
|
+ }
|
|
|
+ final BulkResponse bulkResponse = bulkRequest.execute().actionGet(fessConfig.getIndexBulkTimeout());
|
|
|
+ if (bulkResponse.hasFailures()) {
|
|
|
+ throw new IllegalBehaviorStateException(bulkResponse.buildFailureMessage());
|
|
|
+ }
|
|
|
+
|
|
|
+ response =
|
|
|
+ client.prepareSearchScroll(scrollId).setScroll(scrollForUpdate).execute()
|
|
|
+ .actionGet(fessConfig.getIndexBulkTimeout());
|
|
|
+ if (!scrollId.equals(response.getScrollId())) {
|
|
|
+ deleteScrollContext(scrollId);
|
|
|
+ }
|
|
|
+ scrollId = response.getScrollId();
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ deleteScrollContext(scrollId);
|
|
|
+ }
|
|
|
+ return count;
|
|
|
+ }
|
|
|
+
|
|
|
public long deleteByQuery(final String index, final QueryBuilder queryBuilder) {
|
|
|
|
|
|
final FessConfig fessConfig = ComponentUtil.getFessConfig();
|
|
@@ -1477,6 +1530,14 @@ public class FessEsClient implements Client {
|
|
|
return client.prepareMultiTermVectors();
|
|
|
}
|
|
|
|
|
|
+ public void setSizeForUpdate(final int sizeForUpdate) {
|
|
|
+ this.sizeForUpdate = sizeForUpdate;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void setScrollForUpdate(final String scrollForUpdate) {
|
|
|
+ this.scrollForUpdate = scrollForUpdate;
|
|
|
+ }
|
|
|
+
|
|
|
public void setSizeForDelete(final int sizeForDelete) {
|
|
|
this.sizeForDelete = sizeForDelete;
|
|
|
}
|