fix #415 : remove SCAN query

This commit is contained in:
Shinsuke Sugaya 2016-03-05 18:52:25 +09:00
parent 00e40013ef
commit bf0566cf4d
4 changed files with 94 additions and 94 deletions

View file

@ -67,6 +67,7 @@ import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
@ -151,7 +152,6 @@ import org.elasticsearch.common.settings.Settings.Builder;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.get.GetField;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.indices.IndexAlreadyExistsException;
@ -308,9 +308,10 @@ public class FessEsClient implements Client {
final String configType = values[1];
boolean exists = false;
try {
client.prepareExists(configIndex).execute().actionGet(fessConfig.getIndexSearchTimeout());
exists = true;
} catch (final IndexNotFoundException e) {
IndicesExistsResponse response =
client.admin().indices().prepareExists(configIndex).execute().actionGet(fessConfig.getIndexSearchTimeout());
exists = response.isExists();
} catch (final Exception e) {
// ignore
}
if (!exists) {
@ -744,12 +745,14 @@ public class FessEsClient implements Client {
final BulkResponse response = bulkRequestBuilder.execute().actionGet(ComponentUtil.getFessConfig().getIndexBulkTimeout());
if (response.hasFailures()) {
if (logger.isDebugEnabled()) {
@SuppressWarnings("rawtypes")
final List<ActionRequest> requests = bulkRequestBuilder.request().requests();
final BulkItemResponse[] items = response.getItems();
if (requests.size() == items.length) {
for (int i = 0; i < requests.size(); i++) {
final BulkItemResponse resp = items[i];
if (resp.isFailed() && resp.getFailure() != null) {
@SuppressWarnings("rawtypes")
final ActionRequest req = requests.get(i);
final Failure failure = resp.getFailure();
logger.debug("Failed Request: " + req + "\n=>" + failure.getMessage());
@ -1138,31 +1141,37 @@ public class FessEsClient implements Client {
return client.prepareMultiGet();
}
@SuppressWarnings("deprecation")
@Override
public ActionFuture<CountResponse> count(final CountRequest request) {
return client.count(request);
}
@SuppressWarnings("deprecation")
@Override
public void count(final CountRequest request, final ActionListener<CountResponse> listener) {
client.count(request, listener);
}
@SuppressWarnings("deprecation")
@Override
public CountRequestBuilder prepareCount(final String... indices) {
return client.prepareCount(indices);
}
@SuppressWarnings("deprecation")
@Override
public ActionFuture<ExistsResponse> exists(final ExistsRequest request) {
return client.exists(request);
}
@SuppressWarnings("deprecation")
@Override
public void exists(final ExistsRequest request, final ActionListener<ExistsResponse> listener) {
client.exists(request, listener);
}
@SuppressWarnings("deprecation")
@Override
public ExistsRequestBuilder prepareExists(final String... indices) {
return client.prepareExists(indices);
@ -1308,18 +1317,21 @@ public class FessEsClient implements Client {
return client.settings();
}
@SuppressWarnings("rawtypes")
@Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> ActionFuture<Response> execute(
final Action<Request, Response, RequestBuilder> action, final Request request) {
return client.execute(action, request);
}
@SuppressWarnings("rawtypes")
@Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void execute(
final Action<Request, Response, RequestBuilder> action, final Request request, final ActionListener<Response> listener) {
client.execute(action, request, listener);
}
@SuppressWarnings("rawtypes")
@Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> RequestBuilder prepareExecute(
final Action<Request, Response, RequestBuilder> action) {

View file

@ -42,14 +42,12 @@ import org.dbflute.util.DfTypeUtil;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.count.CountRequestBuilder;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.search.SearchHit;
@ -89,8 +87,8 @@ public abstract class EsAbstractBehavior<ENTITY extends Entity, CB extends Condi
@Override
protected int delegateSelectCountUniquely(final ConditionBean cb) {
// #pending check response and cast problem
final CountRequestBuilder builder = client.prepareCount(asEsIndex()).setTypes(asEsSearchType());
return (int) ((EsAbstractConditionBean) cb).build(builder).execute().actionGet(searchTimeout).getCount();
final SearchRequestBuilder builder = client.prepareSearch(asEsIndex()).setTypes(asEsSearchType());
return (int) ((EsAbstractConditionBean) cb).build(builder).execute().actionGet(searchTimeout).getHits().getTotalHits();
}
@Override
@ -197,21 +195,20 @@ public abstract class EsAbstractBehavior<ENTITY extends Entity, CB extends Condi
}
protected void delegateBulkRequest(final ConditionBean cb, Function<SearchHits, Boolean> handler) {
final SearchRequestBuilder builder =
client.prepareSearch(asEsIndex()).setTypes(asEsIndexType()).setSearchType(SearchType.SCAN).setScroll(scrollForCursor)
.setSize(sizeForCursor);
((EsAbstractConditionBean) cb).request().build(builder);
final SearchResponse response = ((EsAbstractConditionBean) cb).build(builder).execute().actionGet(scrollSearchTimeout);
String scrollId = response.getScrollId();
while (scrollId != null) {
final SearchResponse scrollResponse =
client.prepareSearchScroll(scrollId).setScroll(scrollForDelete).execute().actionGet(scrollSearchTimeout);
scrollId = scrollResponse.getScrollId();
final SearchHits searchHits = scrollResponse.getHits();
SearchResponse response = null;
while (true) {
if (response == null) {
final SearchRequestBuilder builder =
client.prepareSearch(asEsIndex()).setTypes(asEsIndexType()).setScroll(scrollForCursor).setSize(sizeForCursor);
((EsAbstractConditionBean) cb).request().build(builder);
response = ((EsAbstractConditionBean) cb).build(builder).execute().actionGet(scrollSearchTimeout);
} else {
final String scrollId = response.getScrollId();
response = client.prepareSearchScroll(scrollId).setScroll(scrollForDelete).execute().actionGet(scrollSearchTimeout);
}
final SearchHits searchHits = response.getHits();
final SearchHit[] hits = searchHits.getHits();
if (hits.length == 0) {
scrollId = null;
break;
}
@ -308,22 +305,21 @@ public abstract class EsAbstractBehavior<ENTITY extends Entity, CB extends Condi
@Override
protected int delegateQueryDelete(final ConditionBean cb, final DeleteOption<? extends ConditionBean> option) {
final SearchRequestBuilder builder =
client.prepareSearch(asEsIndex()).setTypes(asEsIndexType()).setSearchType(SearchType.SCAN).setScroll(scrollForDelete)
.setSize(sizeForDelete);
((EsAbstractConditionBean) cb).request().build(builder);
final SearchResponse response = ((EsAbstractConditionBean) cb).build(builder).execute().actionGet(scrollSearchTimeout);
SearchResponse response = null;
int count = 0;
String scrollId = response.getScrollId();
while (scrollId != null) {
final SearchResponse scrollResponse =
client.prepareSearchScroll(scrollId).setScroll(scrollForDelete).execute().actionGet(scrollSearchTimeout);
scrollId = scrollResponse.getScrollId();
final SearchHits searchHits = scrollResponse.getHits();
while (true) {
if (response == null) {
final SearchRequestBuilder builder =
client.prepareSearch(asEsIndex()).setTypes(asEsIndexType()).setScroll(scrollForDelete).setSize(sizeForDelete);
((EsAbstractConditionBean) cb).request().build(builder);
response = ((EsAbstractConditionBean) cb).build(builder).execute().actionGet(scrollSearchTimeout);
} else {
final String scrollId = response.getScrollId();
response = client.prepareSearchScroll(scrollId).setScroll(scrollForDelete).execute().actionGet(scrollSearchTimeout);
}
final SearchHits searchHits = response.getHits();
final SearchHit[] hits = searchHits.getHits();
if (hits.length == 0) {
scrollId = null;
break;
}

View file

@ -42,14 +42,12 @@ import org.dbflute.util.DfTypeUtil;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.count.CountRequestBuilder;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.search.SearchHit;
@ -89,8 +87,8 @@ public abstract class EsAbstractBehavior<ENTITY extends Entity, CB extends Condi
@Override
protected int delegateSelectCountUniquely(final ConditionBean cb) {
// #pending check response and cast problem
final CountRequestBuilder builder = client.prepareCount(asEsIndex()).setTypes(asEsSearchType());
return (int) ((EsAbstractConditionBean) cb).build(builder).execute().actionGet(searchTimeout).getCount();
final SearchRequestBuilder builder = client.prepareSearch(asEsIndex()).setTypes(asEsSearchType());
return (int) ((EsAbstractConditionBean) cb).build(builder).execute().actionGet(searchTimeout).getHits().getTotalHits();
}
@Override
@ -197,21 +195,20 @@ public abstract class EsAbstractBehavior<ENTITY extends Entity, CB extends Condi
}
protected void delegateBulkRequest(final ConditionBean cb, Function<SearchHits, Boolean> handler) {
final SearchRequestBuilder builder =
client.prepareSearch(asEsIndex()).setTypes(asEsIndexType()).setSearchType(SearchType.SCAN).setScroll(scrollForCursor)
.setSize(sizeForCursor);
((EsAbstractConditionBean) cb).request().build(builder);
final SearchResponse response = ((EsAbstractConditionBean) cb).build(builder).execute().actionGet(scrollSearchTimeout);
String scrollId = response.getScrollId();
while (scrollId != null) {
final SearchResponse scrollResponse =
client.prepareSearchScroll(scrollId).setScroll(scrollForDelete).execute().actionGet(scrollSearchTimeout);
scrollId = scrollResponse.getScrollId();
final SearchHits searchHits = scrollResponse.getHits();
SearchResponse response = null;
while (true) {
if (response == null) {
final SearchRequestBuilder builder =
client.prepareSearch(asEsIndex()).setTypes(asEsIndexType()).setScroll(scrollForCursor).setSize(sizeForCursor);
((EsAbstractConditionBean) cb).request().build(builder);
response = ((EsAbstractConditionBean) cb).build(builder).execute().actionGet(scrollSearchTimeout);
} else {
final String scrollId = response.getScrollId();
response = client.prepareSearchScroll(scrollId).setScroll(scrollForDelete).execute().actionGet(scrollSearchTimeout);
}
final SearchHits searchHits = response.getHits();
final SearchHit[] hits = searchHits.getHits();
if (hits.length == 0) {
scrollId = null;
break;
}
@ -308,22 +305,21 @@ public abstract class EsAbstractBehavior<ENTITY extends Entity, CB extends Condi
@Override
protected int delegateQueryDelete(final ConditionBean cb, final DeleteOption<? extends ConditionBean> option) {
final SearchRequestBuilder builder =
client.prepareSearch(asEsIndex()).setTypes(asEsIndexType()).setSearchType(SearchType.SCAN).setScroll(scrollForDelete)
.setSize(sizeForDelete);
((EsAbstractConditionBean) cb).request().build(builder);
final SearchResponse response = ((EsAbstractConditionBean) cb).build(builder).execute().actionGet(scrollSearchTimeout);
SearchResponse response = null;
int count = 0;
String scrollId = response.getScrollId();
while (scrollId != null) {
final SearchResponse scrollResponse =
client.prepareSearchScroll(scrollId).setScroll(scrollForDelete).execute().actionGet(scrollSearchTimeout);
scrollId = scrollResponse.getScrollId();
final SearchHits searchHits = scrollResponse.getHits();
while (true) {
if (response == null) {
final SearchRequestBuilder builder =
client.prepareSearch(asEsIndex()).setTypes(asEsIndexType()).setScroll(scrollForDelete).setSize(sizeForDelete);
((EsAbstractConditionBean) cb).request().build(builder);
response = ((EsAbstractConditionBean) cb).build(builder).execute().actionGet(scrollSearchTimeout);
} else {
final String scrollId = response.getScrollId();
response = client.prepareSearchScroll(scrollId).setScroll(scrollForDelete).execute().actionGet(scrollSearchTimeout);
}
final SearchHits searchHits = response.getHits();
final SearchHit[] hits = searchHits.getHits();
if (hits.length == 0) {
scrollId = null;
break;
}

View file

@ -42,14 +42,12 @@ import org.dbflute.util.DfTypeUtil;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.count.CountRequestBuilder;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.search.SearchHit;
@ -89,8 +87,8 @@ public abstract class EsAbstractBehavior<ENTITY extends Entity, CB extends Condi
@Override
protected int delegateSelectCountUniquely(final ConditionBean cb) {
// #pending check response and cast problem
final CountRequestBuilder builder = client.prepareCount(asEsIndex()).setTypes(asEsSearchType());
return (int) ((EsAbstractConditionBean) cb).build(builder).execute().actionGet(searchTimeout).getCount();
final SearchRequestBuilder builder = client.prepareSearch(asEsIndex()).setTypes(asEsSearchType());
return (int) ((EsAbstractConditionBean) cb).build(builder).execute().actionGet(searchTimeout).getHits().getTotalHits();
}
@Override
@ -197,21 +195,20 @@ public abstract class EsAbstractBehavior<ENTITY extends Entity, CB extends Condi
}
protected void delegateBulkRequest(final ConditionBean cb, Function<SearchHits, Boolean> handler) {
final SearchRequestBuilder builder =
client.prepareSearch(asEsIndex()).setTypes(asEsIndexType()).setSearchType(SearchType.SCAN).setScroll(scrollForCursor)
.setSize(sizeForCursor);
((EsAbstractConditionBean) cb).request().build(builder);
final SearchResponse response = ((EsAbstractConditionBean) cb).build(builder).execute().actionGet(scrollSearchTimeout);
String scrollId = response.getScrollId();
while (scrollId != null) {
final SearchResponse scrollResponse =
client.prepareSearchScroll(scrollId).setScroll(scrollForDelete).execute().actionGet(scrollSearchTimeout);
scrollId = scrollResponse.getScrollId();
final SearchHits searchHits = scrollResponse.getHits();
SearchResponse response = null;
while (true) {
if (response == null) {
final SearchRequestBuilder builder =
client.prepareSearch(asEsIndex()).setTypes(asEsIndexType()).setScroll(scrollForCursor).setSize(sizeForCursor);
((EsAbstractConditionBean) cb).request().build(builder);
response = ((EsAbstractConditionBean) cb).build(builder).execute().actionGet(scrollSearchTimeout);
} else {
final String scrollId = response.getScrollId();
response = client.prepareSearchScroll(scrollId).setScroll(scrollForDelete).execute().actionGet(scrollSearchTimeout);
}
final SearchHits searchHits = response.getHits();
final SearchHit[] hits = searchHits.getHits();
if (hits.length == 0) {
scrollId = null;
break;
}
@ -308,22 +305,21 @@ public abstract class EsAbstractBehavior<ENTITY extends Entity, CB extends Condi
@Override
protected int delegateQueryDelete(final ConditionBean cb, final DeleteOption<? extends ConditionBean> option) {
final SearchRequestBuilder builder =
client.prepareSearch(asEsIndex()).setTypes(asEsIndexType()).setSearchType(SearchType.SCAN).setScroll(scrollForDelete)
.setSize(sizeForDelete);
((EsAbstractConditionBean) cb).request().build(builder);
final SearchResponse response = ((EsAbstractConditionBean) cb).build(builder).execute().actionGet(scrollSearchTimeout);
SearchResponse response = null;
int count = 0;
String scrollId = response.getScrollId();
while (scrollId != null) {
final SearchResponse scrollResponse =
client.prepareSearchScroll(scrollId).setScroll(scrollForDelete).execute().actionGet(scrollSearchTimeout);
scrollId = scrollResponse.getScrollId();
final SearchHits searchHits = scrollResponse.getHits();
while (true) {
if (response == null) {
final SearchRequestBuilder builder =
client.prepareSearch(asEsIndex()).setTypes(asEsIndexType()).setScroll(scrollForDelete).setSize(sizeForDelete);
((EsAbstractConditionBean) cb).request().build(builder);
response = ((EsAbstractConditionBean) cb).build(builder).execute().actionGet(scrollSearchTimeout);
} else {
final String scrollId = response.getScrollId();
response = client.prepareSearchScroll(scrollId).setScroll(scrollForDelete).execute().actionGet(scrollSearchTimeout);
}
final SearchHits searchHits = response.getHits();
final SearchHit[] hits = searchHits.getHits();
if (hits.length == 0) {
scrollId = null;
break;
}