|
@@ -42,14 +42,12 @@ import org.dbflute.util.DfTypeUtil;
|
|
|
import org.elasticsearch.action.bulk.BulkItemResponse;
|
|
|
import org.elasticsearch.action.bulk.BulkRequestBuilder;
|
|
|
import org.elasticsearch.action.bulk.BulkResponse;
|
|
|
-import org.elasticsearch.action.count.CountRequestBuilder;
|
|
|
import org.elasticsearch.action.delete.DeleteRequestBuilder;
|
|
|
import org.elasticsearch.action.delete.DeleteResponse;
|
|
|
import org.elasticsearch.action.index.IndexRequestBuilder;
|
|
|
import org.elasticsearch.action.index.IndexResponse;
|
|
|
import org.elasticsearch.action.search.SearchRequestBuilder;
|
|
|
import org.elasticsearch.action.search.SearchResponse;
|
|
|
-import org.elasticsearch.action.search.SearchType;
|
|
|
import org.elasticsearch.action.update.UpdateRequestBuilder;
|
|
|
import org.elasticsearch.client.Client;
|
|
|
import org.elasticsearch.search.SearchHit;
|
|
@@ -69,6 +67,11 @@ public abstract class EsAbstractBehavior<ENTITY extends Entity, CB extends Condi
|
|
|
protected String scrollForDelete = "1m";
|
|
|
protected int sizeForCursor = 100;
|
|
|
protected String scrollForCursor = "1m";
|
|
|
+ protected String searchTimeout = "3m";
|
|
|
+ protected String indexTimeout = "3m";
|
|
|
+ protected String scrollSearchTimeout = "3m";
|
|
|
+ protected String bulkTimeout = "3m";
|
|
|
+ protected String deleteTimeout = "3m";
|
|
|
|
|
|
protected abstract String asEsIndex();
|
|
|
|
|
@@ -84,8 +87,12 @@ public abstract class EsAbstractBehavior<ENTITY extends Entity, CB extends Condi
|
|
|
@Override
|
|
|
protected int delegateSelectCountUniquely(final ConditionBean cb) {
|
|
|
// #pending check response and cast problem
|
|
|
- final CountRequestBuilder builder = client.prepareCount(asEsIndex()).setTypes(asEsSearchType());
|
|
|
- return (int) ((EsAbstractConditionBean) cb).build(builder).execute().actionGet().getCount();
|
|
|
+ final SearchRequestBuilder builder = client.prepareSearch(asEsIndex()).setTypes(asEsSearchType());
|
|
|
+ final EsAbstractConditionBean esCb = (EsAbstractConditionBean) cb;
|
|
|
+ if (esCb.getPreference() != null) {
|
|
|
+ builder.setPreference(esCb.getPreference());
|
|
|
+ }
|
|
|
+ return (int) esCb.build(builder).execute().actionGet(searchTimeout).getHits().getTotalHits();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -116,8 +123,12 @@ public abstract class EsAbstractBehavior<ENTITY extends Entity, CB extends Condi
|
|
|
}
|
|
|
builder.setFrom(from);
|
|
|
builder.setSize(size);
|
|
|
- ((EsAbstractConditionBean) cb).request().build(builder);
|
|
|
- final SearchResponse response = ((EsAbstractConditionBean) cb).build(builder).execute().actionGet();
|
|
|
+ final EsAbstractConditionBean esCb = (EsAbstractConditionBean) cb;
|
|
|
+ if (esCb.getPreference() != null) {
|
|
|
+ builder.setPreference(esCb.getPreference());
|
|
|
+ }
|
|
|
+ esCb.request().build(builder);
|
|
|
+ final SearchResponse response = esCb.build(builder).execute().actionGet(searchTimeout);
|
|
|
|
|
|
final EsPagingResultBean<RESULT> list = new EsPagingResultBean<>();
|
|
|
final SearchHits searchHits = response.getHits();
|
|
@@ -192,20 +203,24 @@ public abstract class EsAbstractBehavior<ENTITY extends Entity, CB extends Condi
|
|
|
}
|
|
|
|
|
|
protected void delegateBulkRequest(final ConditionBean cb, Function<SearchHits, Boolean> handler) {
|
|
|
- final SearchRequestBuilder builder =
|
|
|
- client.prepareSearch(asEsIndex()).setTypes(asEsIndexType()).setSearchType(SearchType.SCAN).setScroll(scrollForCursor)
|
|
|
- .setSize(sizeForCursor);
|
|
|
- ((EsAbstractConditionBean) cb).request().build(builder);
|
|
|
- final SearchResponse response = ((EsAbstractConditionBean) cb).build(builder).execute().actionGet();
|
|
|
-
|
|
|
- String scrollId = response.getScrollId();
|
|
|
- while (scrollId != null) {
|
|
|
- final SearchResponse scrollResponse = client.prepareSearchScroll(scrollId).setScroll(scrollForDelete).execute().actionGet();
|
|
|
- scrollId = scrollResponse.getScrollId();
|
|
|
- final SearchHits searchHits = scrollResponse.getHits();
|
|
|
+ SearchResponse response = null;
|
|
|
+ while (true) {
|
|
|
+ if (response == null) {
|
|
|
+ final SearchRequestBuilder builder =
|
|
|
+ client.prepareSearch(asEsIndex()).setTypes(asEsIndexType()).setScroll(scrollForCursor).setSize(sizeForCursor);
|
|
|
+ final EsAbstractConditionBean esCb = (EsAbstractConditionBean) cb;
|
|
|
+ if (esCb.getPreference() != null) {
|
|
|
+ builder.setPreference(esCb.getPreference());
|
|
|
+ }
|
|
|
+ esCb.request().build(builder);
|
|
|
+ response = esCb.build(builder).execute().actionGet(scrollSearchTimeout);
|
|
|
+ } else {
|
|
|
+ final String scrollId = response.getScrollId();
|
|
|
+ response = client.prepareSearchScroll(scrollId).setScroll(scrollForDelete).execute().actionGet(scrollSearchTimeout);
|
|
|
+ }
|
|
|
+ final SearchHits searchHits = response.getHits();
|
|
|
final SearchHit[] hits = searchHits.getHits();
|
|
|
if (hits.length == 0) {
|
|
|
- scrollId = null;
|
|
|
break;
|
|
|
}
|
|
|
|
|
@@ -237,7 +252,7 @@ public abstract class EsAbstractBehavior<ENTITY extends Entity, CB extends Condi
|
|
|
final EsAbstractEntity esEntity = (EsAbstractEntity) entity;
|
|
|
IndexRequestBuilder builder = createInsertRequest(esEntity);
|
|
|
|
|
|
- final IndexResponse response = builder.execute().actionGet();
|
|
|
+ final IndexResponse response = builder.execute().actionGet(indexTimeout);
|
|
|
esEntity.asDocMeta().id(response.getId());
|
|
|
return response.isCreated() ? 1 : 0;
|
|
|
}
|
|
@@ -260,7 +275,7 @@ public abstract class EsAbstractBehavior<ENTITY extends Entity, CB extends Condi
|
|
|
final EsAbstractEntity esEntity = (EsAbstractEntity) entity;
|
|
|
final IndexRequestBuilder builder = createUpdateRequest(esEntity);
|
|
|
|
|
|
- final IndexResponse response = builder.execute().actionGet();
|
|
|
+ final IndexResponse response = builder.execute().actionGet(indexTimeout);
|
|
|
long version = response.getVersion();
|
|
|
if (version != -1) {
|
|
|
esEntity.asDocMeta().version(version);
|
|
@@ -287,7 +302,7 @@ public abstract class EsAbstractBehavior<ENTITY extends Entity, CB extends Condi
|
|
|
final EsAbstractEntity esEntity = (EsAbstractEntity) entity;
|
|
|
final DeleteRequestBuilder builder = createDeleteRequest(esEntity);
|
|
|
|
|
|
- final DeleteResponse response = builder.execute().actionGet();
|
|
|
+ final DeleteResponse response = builder.execute().actionGet(deleteTimeout);
|
|
|
return response.isFound() ? 1 : 0;
|
|
|
}
|
|
|
|
|
@@ -302,21 +317,25 @@ public abstract class EsAbstractBehavior<ENTITY extends Entity, CB extends Condi
|
|
|
|
|
|
@Override
|
|
|
protected int delegateQueryDelete(final ConditionBean cb, final DeleteOption<? extends ConditionBean> option) {
|
|
|
- final SearchRequestBuilder builder =
|
|
|
- client.prepareSearch(asEsIndex()).setTypes(asEsIndexType()).setSearchType(SearchType.SCAN).setScroll(scrollForDelete)
|
|
|
- .setSize(sizeForDelete);
|
|
|
- ((EsAbstractConditionBean) cb).request().build(builder);
|
|
|
- final SearchResponse response = ((EsAbstractConditionBean) cb).build(builder).execute().actionGet();
|
|
|
-
|
|
|
+ SearchResponse response = null;
|
|
|
int count = 0;
|
|
|
- String scrollId = response.getScrollId();
|
|
|
- while (scrollId != null) {
|
|
|
- final SearchResponse scrollResponse = client.prepareSearchScroll(scrollId).setScroll(scrollForDelete).execute().actionGet();
|
|
|
- scrollId = scrollResponse.getScrollId();
|
|
|
- final SearchHits searchHits = scrollResponse.getHits();
|
|
|
+ while (true) {
|
|
|
+ if (response == null) {
|
|
|
+ final SearchRequestBuilder builder =
|
|
|
+ client.prepareSearch(asEsIndex()).setTypes(asEsIndexType()).setScroll(scrollForDelete).setSize(sizeForDelete);
|
|
|
+ final EsAbstractConditionBean esCb = (EsAbstractConditionBean) cb;
|
|
|
+ if (esCb.getPreference() != null) {
|
|
|
+ esCb.setPreference(esCb.getPreference());
|
|
|
+ }
|
|
|
+ esCb.request().build(builder);
|
|
|
+ response = esCb.build(builder).execute().actionGet(scrollSearchTimeout);
|
|
|
+ } else {
|
|
|
+ final String scrollId = response.getScrollId();
|
|
|
+ response = client.prepareSearchScroll(scrollId).setScroll(scrollForDelete).execute().actionGet(scrollSearchTimeout);
|
|
|
+ }
|
|
|
+ final SearchHits searchHits = response.getHits();
|
|
|
final SearchHit[] hits = searchHits.getHits();
|
|
|
if (hits.length == 0) {
|
|
|
- scrollId = null;
|
|
|
break;
|
|
|
}
|
|
|
|
|
@@ -325,7 +344,7 @@ public abstract class EsAbstractBehavior<ENTITY extends Entity, CB extends Condi
|
|
|
bulkRequest.add(client.prepareDelete(asEsIndex(), asEsIndexType(), hit.getId()));
|
|
|
}
|
|
|
count += hits.length;
|
|
|
- final BulkResponse bulkResponse = bulkRequest.execute().actionGet();
|
|
|
+ final BulkResponse bulkResponse = bulkRequest.execute().actionGet(bulkTimeout);
|
|
|
if (bulkResponse.hasFailures()) {
|
|
|
throw new IllegalBehaviorStateException(bulkResponse.buildFailureMessage());
|
|
|
}
|
|
@@ -390,7 +409,7 @@ public abstract class EsAbstractBehavior<ENTITY extends Entity, CB extends Condi
|
|
|
builderCall.callback(bulkBuilder);
|
|
|
}
|
|
|
|
|
|
- final BulkResponse response = bulkBuilder.execute().actionGet();
|
|
|
+ final BulkResponse response = bulkBuilder.execute().actionGet(bulkTimeout);
|
|
|
final BulkItemResponse[] itemResponses = response.getItems();
|
|
|
if (itemResponses.length != entityList.size()) {
|
|
|
throw new IllegalStateException("Invalid response size: " + itemResponses.length + " != " + entityList.size());
|
|
@@ -419,6 +438,42 @@ public abstract class EsAbstractBehavior<ENTITY extends Entity, CB extends Condi
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
+ public void setSizeForDelete(int sizeForDelete) {
|
|
|
+ this.sizeForDelete = sizeForDelete;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void setScrollForDelete(String scrollForDelete) {
|
|
|
+ this.scrollForDelete = scrollForDelete;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void setSizeForCursor(int sizeForCursor) {
|
|
|
+ this.sizeForCursor = sizeForCursor;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void setScrollForCursor(String scrollForCursor) {
|
|
|
+ this.scrollForCursor = scrollForCursor;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void setSearchTimeout(String searchTimeout) {
|
|
|
+ this.searchTimeout = searchTimeout;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void setIndexTimeout(String indexTimeout) {
|
|
|
+ this.indexTimeout = indexTimeout;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void setScrollSearchTimeout(String scrollSearchTimeout) {
|
|
|
+ this.scrollSearchTimeout = scrollSearchTimeout;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void setBulkTimeout(String bulkTimeout) {
|
|
|
+ this.bulkTimeout = bulkTimeout;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void setDeleteTimeout(String deleteTimeout) {
|
|
|
+ this.deleteTimeout = deleteTimeout;
|
|
|
+ }
|
|
|
+
|
|
|
// ===================================================================================
|
|
|
// Assist Logic
|
|
|
// ============
|