#2025 clear search context

This commit is contained in:
Shinsuke Sugaya 2019-03-03 08:05:43 +09:00
parent 400f565638
commit b599694571
4 changed files with 210 additions and 169 deletions

View file

@ -653,31 +653,45 @@ public class FessEsClient implements Client {
int count = 0;
String scrollId = response.getScrollId();
while (scrollId != null) {
final SearchHits searchHits = response.getHits();
final SearchHit[] hits = searchHits.getHits();
if (hits.length == 0) {
scrollId = null;
break;
}
try {
while (scrollId != null) {
final SearchHits searchHits = response.getHits();
final SearchHit[] hits = searchHits.getHits();
if (hits.length == 0) {
break;
}
final BulkRequestBuilder bulkRequest = client.prepareBulk();
for (final SearchHit hit : hits) {
bulkRequest.add(client.prepareDelete().setIndex(index).setId(hit.getId()));
count++;
}
final BulkResponse bulkResponse = bulkRequest.execute().actionGet(fessConfig.getIndexBulkTimeout());
if (bulkResponse.hasFailures()) {
throw new IllegalBehaviorStateException(bulkResponse.buildFailureMessage());
}
final BulkRequestBuilder bulkRequest = client.prepareBulk();
for (final SearchHit hit : hits) {
bulkRequest.add(client.prepareDelete().setIndex(index).setId(hit.getId()));
count++;
}
final BulkResponse bulkResponse = bulkRequest.execute().actionGet(fessConfig.getIndexBulkTimeout());
if (bulkResponse.hasFailures()) {
throw new IllegalBehaviorStateException(bulkResponse.buildFailureMessage());
}
response =
client.prepareSearchScroll(scrollId).setScroll(scrollForDelete).execute().actionGet(fessConfig.getIndexBulkTimeout());
scrollId = response.getScrollId();
response =
client.prepareSearchScroll(scrollId).setScroll(scrollForDelete).execute()
.actionGet(fessConfig.getIndexBulkTimeout());
if (!scrollId.equals(response.getScrollId())) {
deleteScrollContext(scrollId);
}
scrollId = response.getScrollId();
}
} finally {
deleteScrollContext(scrollId);
}
return count;
}
protected void deleteScrollContext(final String scrollId) {
if (scrollId != null) {
client.prepareClearScroll().addScrollId(scrollId)
.execute(ActionListener.wrap(res -> {}, e -> logger.warn("Failed to clear the scroll context.", e)));
}
}
protected <T> T get(final String index, final String type, final String id, final SearchCondition<GetRequestBuilder> condition,
final SearchResult<T, GetRequestBuilder, GetResponse> searchResult) {
final long startTime = System.currentTimeMillis();

View file

@ -227,32 +227,40 @@ public abstract class EsAbstractBehavior<ENTITY extends Entity, CB extends Condi
}
protected void delegateBulkRequest(final ConditionBean cb, Function<SearchHits, Boolean> handler) {
SearchResponse response = null;
while (true) {
if (response == null) {
final SearchRequestBuilder builder = client.prepareSearch(asEsIndex()).setScroll(scrollForCursor).setSize(sizeForCursor);
final EsAbstractConditionBean esCb = (EsAbstractConditionBean) cb;
if (esCb.getPreference() != null) {
builder.setPreference(esCb.getPreference());
final SearchRequestBuilder builder = client.prepareSearch(asEsIndex()).setScroll(scrollForCursor).setSize(sizeForCursor);
final EsAbstractConditionBean esCb = (EsAbstractConditionBean) cb;
if (esCb.getPreference() != null) {
builder.setPreference(esCb.getPreference());
}
esCb.request().build(builder);
SearchResponse response = esCb.build(builder).execute().actionGet(scrollSearchTimeout);
String scrollId = response.getScrollId();
try {
while (scrollId != null) {
final SearchHits searchHits = response.getHits();
final SearchHit[] hits = searchHits.getHits();
if (hits.length == 0) {
break;
}
esCb.request().build(builder);
response = esCb.build(builder).execute().actionGet(scrollSearchTimeout);
} else {
final String scrollId = response.getScrollId();
response = client.prepareSearchScroll(scrollId).setScroll(scrollForDelete).execute().actionGet(scrollSearchTimeout);
}
final SearchHits searchHits = response.getHits();
final SearchHit[] hits = searchHits.getHits();
if (hits.length == 0) {
break;
}
if (!handler.apply(searchHits)) {
if (response.getScrollId() != null) {
client.prepareClearScroll().addScrollId(response.getScrollId()).execute(ActionListener.wrap(() -> {}));
if (!handler.apply(searchHits)) {
break;
}
break;
response = client.prepareSearchScroll(scrollId).setScroll(scrollForDelete).execute().actionGet(scrollSearchTimeout);
if (!scrollId.equals(response.getScrollId())) {
deleteScrollContext(scrollId);
}
scrollId = response.getScrollId();
}
} finally {
deleteScrollContext(scrollId);
}
}
protected void deleteScrollContext(final String scrollId) {
if (scrollId != null) {
client.prepareClearScroll().addScrollId(scrollId).execute(ActionListener.wrap(() -> {}));
}
}
@ -356,39 +364,40 @@ public abstract class EsAbstractBehavior<ENTITY extends Entity, CB extends Condi
@Override
protected int delegateQueryDelete(final ConditionBean cb, final DeleteOption<? extends ConditionBean> option) {
SearchResponse response = null;
final SearchRequestBuilder builder = client.prepareSearch(asEsIndex()).setScroll(scrollForDelete).setSize(sizeForDelete);
final EsAbstractConditionBean esCb = (EsAbstractConditionBean) cb;
if (esCb.getPreference() != null) {
esCb.setPreference(esCb.getPreference());
}
esCb.request().build(builder);
SearchResponse response = esCb.build(builder).execute().actionGet(scrollSearchTimeout);
String scrollId = response.getScrollId();
int count = 0;
while (true) {
if (response == null) {
final SearchRequestBuilder builder = client.prepareSearch(asEsIndex()).setScroll(scrollForDelete).setSize(sizeForDelete);
final EsAbstractConditionBean esCb = (EsAbstractConditionBean) cb;
if (esCb.getPreference() != null) {
esCb.setPreference(esCb.getPreference());
try {
while (scrollId != null) {
final SearchHits searchHits = response.getHits();
final SearchHit[] hits = searchHits.getHits();
if (hits.length == 0) {
break;
}
esCb.request().build(builder);
response = esCb.build(builder).execute().actionGet(scrollSearchTimeout);
} else {
final String scrollId = response.getScrollId();
response = client.prepareSearchScroll(scrollId).setScroll(scrollForDelete).execute().actionGet(scrollSearchTimeout);
}
final SearchHits searchHits = response.getHits();
final SearchHit[] hits = searchHits.getHits();
if (hits.length == 0) {
break;
}
final BulkRequestBuilder bulkRequest = client.prepareBulk();
for (final SearchHit hit : hits) {
bulkRequest.add(client.prepareDelete().setIndex(asEsIndex()).setId(hit.getId()));
}
count += hits.length;
final BulkResponse bulkResponse = bulkRequest.execute().actionGet(bulkTimeout);
if (bulkResponse.hasFailures()) {
if (response.getScrollId() != null) {
client.prepareClearScroll().addScrollId(response.getScrollId()).execute(ActionListener.wrap(() -> {}));
final BulkRequestBuilder bulkRequest = client.prepareBulk();
for (final SearchHit hit : hits) {
bulkRequest.add(client.prepareDelete().setIndex(asEsIndex()).setId(hit.getId()));
}
count += hits.length;
final BulkResponse bulkResponse = bulkRequest.execute().actionGet(bulkTimeout);
if (bulkResponse.hasFailures()) {
throw new IllegalBehaviorStateException(bulkResponse.buildFailureMessage());
}
response = client.prepareSearchScroll(scrollId).setScroll(scrollForDelete).execute().actionGet(scrollSearchTimeout);
if (!scrollId.equals(response.getScrollId())) {
deleteScrollContext(scrollId);
}
throw new IllegalBehaviorStateException(bulkResponse.buildFailureMessage());
}
} finally {
deleteScrollContext(scrollId);
}
return count;
}

View file

@ -227,32 +227,40 @@ public abstract class EsAbstractBehavior<ENTITY extends Entity, CB extends Condi
}
protected void delegateBulkRequest(final ConditionBean cb, Function<SearchHits, Boolean> handler) {
SearchResponse response = null;
while (true) {
if (response == null) {
final SearchRequestBuilder builder = client.prepareSearch(asEsIndex()).setScroll(scrollForCursor).setSize(sizeForCursor);
final EsAbstractConditionBean esCb = (EsAbstractConditionBean) cb;
if (esCb.getPreference() != null) {
builder.setPreference(esCb.getPreference());
final SearchRequestBuilder builder = client.prepareSearch(asEsIndex()).setScroll(scrollForCursor).setSize(sizeForCursor);
final EsAbstractConditionBean esCb = (EsAbstractConditionBean) cb;
if (esCb.getPreference() != null) {
builder.setPreference(esCb.getPreference());
}
esCb.request().build(builder);
SearchResponse response = esCb.build(builder).execute().actionGet(scrollSearchTimeout);
String scrollId = response.getScrollId();
try {
while (scrollId != null) {
final SearchHits searchHits = response.getHits();
final SearchHit[] hits = searchHits.getHits();
if (hits.length == 0) {
break;
}
esCb.request().build(builder);
response = esCb.build(builder).execute().actionGet(scrollSearchTimeout);
} else {
final String scrollId = response.getScrollId();
response = client.prepareSearchScroll(scrollId).setScroll(scrollForDelete).execute().actionGet(scrollSearchTimeout);
}
final SearchHits searchHits = response.getHits();
final SearchHit[] hits = searchHits.getHits();
if (hits.length == 0) {
break;
}
if (!handler.apply(searchHits)) {
if (response.getScrollId() != null) {
client.prepareClearScroll().addScrollId(response.getScrollId()).execute(ActionListener.wrap(() -> {}));
if (!handler.apply(searchHits)) {
break;
}
break;
response = client.prepareSearchScroll(scrollId).setScroll(scrollForDelete).execute().actionGet(scrollSearchTimeout);
if (!scrollId.equals(response.getScrollId())) {
deleteScrollContext(scrollId);
}
scrollId = response.getScrollId();
}
} finally {
deleteScrollContext(scrollId);
}
}
protected void deleteScrollContext(final String scrollId) {
if (scrollId != null) {
client.prepareClearScroll().addScrollId(scrollId).execute(ActionListener.wrap(() -> {}));
}
}
@ -356,39 +364,40 @@ public abstract class EsAbstractBehavior<ENTITY extends Entity, CB extends Condi
@Override
protected int delegateQueryDelete(final ConditionBean cb, final DeleteOption<? extends ConditionBean> option) {
SearchResponse response = null;
final SearchRequestBuilder builder = client.prepareSearch(asEsIndex()).setScroll(scrollForDelete).setSize(sizeForDelete);
final EsAbstractConditionBean esCb = (EsAbstractConditionBean) cb;
if (esCb.getPreference() != null) {
esCb.setPreference(esCb.getPreference());
}
esCb.request().build(builder);
SearchResponse response = esCb.build(builder).execute().actionGet(scrollSearchTimeout);
String scrollId = response.getScrollId();
int count = 0;
while (true) {
if (response == null) {
final SearchRequestBuilder builder = client.prepareSearch(asEsIndex()).setScroll(scrollForDelete).setSize(sizeForDelete);
final EsAbstractConditionBean esCb = (EsAbstractConditionBean) cb;
if (esCb.getPreference() != null) {
esCb.setPreference(esCb.getPreference());
try {
while (scrollId != null) {
final SearchHits searchHits = response.getHits();
final SearchHit[] hits = searchHits.getHits();
if (hits.length == 0) {
break;
}
esCb.request().build(builder);
response = esCb.build(builder).execute().actionGet(scrollSearchTimeout);
} else {
final String scrollId = response.getScrollId();
response = client.prepareSearchScroll(scrollId).setScroll(scrollForDelete).execute().actionGet(scrollSearchTimeout);
}
final SearchHits searchHits = response.getHits();
final SearchHit[] hits = searchHits.getHits();
if (hits.length == 0) {
break;
}
final BulkRequestBuilder bulkRequest = client.prepareBulk();
for (final SearchHit hit : hits) {
bulkRequest.add(client.prepareDelete().setIndex(asEsIndex()).setId(hit.getId()));
}
count += hits.length;
final BulkResponse bulkResponse = bulkRequest.execute().actionGet(bulkTimeout);
if (bulkResponse.hasFailures()) {
if (response.getScrollId() != null) {
client.prepareClearScroll().addScrollId(response.getScrollId()).execute(ActionListener.wrap(() -> {}));
final BulkRequestBuilder bulkRequest = client.prepareBulk();
for (final SearchHit hit : hits) {
bulkRequest.add(client.prepareDelete().setIndex(asEsIndex()).setId(hit.getId()));
}
count += hits.length;
final BulkResponse bulkResponse = bulkRequest.execute().actionGet(bulkTimeout);
if (bulkResponse.hasFailures()) {
throw new IllegalBehaviorStateException(bulkResponse.buildFailureMessage());
}
response = client.prepareSearchScroll(scrollId).setScroll(scrollForDelete).execute().actionGet(scrollSearchTimeout);
if (!scrollId.equals(response.getScrollId())) {
deleteScrollContext(scrollId);
}
throw new IllegalBehaviorStateException(bulkResponse.buildFailureMessage());
}
} finally {
deleteScrollContext(scrollId);
}
return count;
}

View file

@ -227,32 +227,40 @@ public abstract class EsAbstractBehavior<ENTITY extends Entity, CB extends Condi
}
protected void delegateBulkRequest(final ConditionBean cb, Function<SearchHits, Boolean> handler) {
SearchResponse response = null;
while (true) {
if (response == null) {
final SearchRequestBuilder builder = client.prepareSearch(asEsIndex()).setScroll(scrollForCursor).setSize(sizeForCursor);
final EsAbstractConditionBean esCb = (EsAbstractConditionBean) cb;
if (esCb.getPreference() != null) {
builder.setPreference(esCb.getPreference());
final SearchRequestBuilder builder = client.prepareSearch(asEsIndex()).setScroll(scrollForCursor).setSize(sizeForCursor);
final EsAbstractConditionBean esCb = (EsAbstractConditionBean) cb;
if (esCb.getPreference() != null) {
builder.setPreference(esCb.getPreference());
}
esCb.request().build(builder);
SearchResponse response = esCb.build(builder).execute().actionGet(scrollSearchTimeout);
String scrollId = response.getScrollId();
try {
while (scrollId != null) {
final SearchHits searchHits = response.getHits();
final SearchHit[] hits = searchHits.getHits();
if (hits.length == 0) {
break;
}
esCb.request().build(builder);
response = esCb.build(builder).execute().actionGet(scrollSearchTimeout);
} else {
final String scrollId = response.getScrollId();
response = client.prepareSearchScroll(scrollId).setScroll(scrollForDelete).execute().actionGet(scrollSearchTimeout);
}
final SearchHits searchHits = response.getHits();
final SearchHit[] hits = searchHits.getHits();
if (hits.length == 0) {
break;
}
if (!handler.apply(searchHits)) {
if (response.getScrollId() != null) {
client.prepareClearScroll().addScrollId(response.getScrollId()).execute(ActionListener.wrap(() -> {}));
if (!handler.apply(searchHits)) {
break;
}
break;
response = client.prepareSearchScroll(scrollId).setScroll(scrollForDelete).execute().actionGet(scrollSearchTimeout);
if (!scrollId.equals(response.getScrollId())) {
deleteScrollContext(scrollId);
}
scrollId = response.getScrollId();
}
} finally {
deleteScrollContext(scrollId);
}
}
protected void deleteScrollContext(final String scrollId) {
if (scrollId != null) {
client.prepareClearScroll().addScrollId(scrollId).execute(ActionListener.wrap(() -> {}));
}
}
@ -356,39 +364,40 @@ public abstract class EsAbstractBehavior<ENTITY extends Entity, CB extends Condi
@Override
protected int delegateQueryDelete(final ConditionBean cb, final DeleteOption<? extends ConditionBean> option) {
SearchResponse response = null;
final SearchRequestBuilder builder = client.prepareSearch(asEsIndex()).setScroll(scrollForDelete).setSize(sizeForDelete);
final EsAbstractConditionBean esCb = (EsAbstractConditionBean) cb;
if (esCb.getPreference() != null) {
esCb.setPreference(esCb.getPreference());
}
esCb.request().build(builder);
SearchResponse response = esCb.build(builder).execute().actionGet(scrollSearchTimeout);
String scrollId = response.getScrollId();
int count = 0;
while (true) {
if (response == null) {
final SearchRequestBuilder builder = client.prepareSearch(asEsIndex()).setScroll(scrollForDelete).setSize(sizeForDelete);
final EsAbstractConditionBean esCb = (EsAbstractConditionBean) cb;
if (esCb.getPreference() != null) {
esCb.setPreference(esCb.getPreference());
try {
while (scrollId != null) {
final SearchHits searchHits = response.getHits();
final SearchHit[] hits = searchHits.getHits();
if (hits.length == 0) {
break;
}
esCb.request().build(builder);
response = esCb.build(builder).execute().actionGet(scrollSearchTimeout);
} else {
final String scrollId = response.getScrollId();
response = client.prepareSearchScroll(scrollId).setScroll(scrollForDelete).execute().actionGet(scrollSearchTimeout);
}
final SearchHits searchHits = response.getHits();
final SearchHit[] hits = searchHits.getHits();
if (hits.length == 0) {
break;
}
final BulkRequestBuilder bulkRequest = client.prepareBulk();
for (final SearchHit hit : hits) {
bulkRequest.add(client.prepareDelete().setIndex(asEsIndex()).setId(hit.getId()));
}
count += hits.length;
final BulkResponse bulkResponse = bulkRequest.execute().actionGet(bulkTimeout);
if (bulkResponse.hasFailures()) {
if (response.getScrollId() != null) {
client.prepareClearScroll().addScrollId(response.getScrollId()).execute(ActionListener.wrap(() -> {}));
final BulkRequestBuilder bulkRequest = client.prepareBulk();
for (final SearchHit hit : hits) {
bulkRequest.add(client.prepareDelete().setIndex(asEsIndex()).setId(hit.getId()));
}
count += hits.length;
final BulkResponse bulkResponse = bulkRequest.execute().actionGet(bulkTimeout);
if (bulkResponse.hasFailures()) {
throw new IllegalBehaviorStateException(bulkResponse.buildFailureMessage());
}
response = client.prepareSearchScroll(scrollId).setScroll(scrollForDelete).execute().actionGet(scrollSearchTimeout);
if (!scrollId.equals(response.getScrollId())) {
deleteScrollContext(scrollId);
}
throw new IllegalBehaviorStateException(bulkResponse.buildFailureMessage());
}
} finally {
deleteScrollContext(scrollId);
}
return count;
}