fix #469 Scroll Search support

This commit is contained in:
Shinsuke Sugaya 2018-02-11 15:38:36 +09:00
parent 705ac80ee5
commit cff518ad60
8 changed files with 194 additions and 7 deletions

View file

@ -29,6 +29,7 @@ import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.codelibs.core.exception.IORuntimeException;
import org.codelibs.core.lang.StringUtil;
import org.codelibs.fess.Constants;
import org.codelibs.fess.api.BaseJsonApiManager;
@ -113,12 +114,71 @@ public class JsonApiManager extends BaseJsonApiManager {
case PING:
processPingRequest(request, response, chain);
break;
case SCROLL:
processScrollSearchRequest(request, response, chain);
break;
default:
writeJsonResponse(99, StringUtil.EMPTY, "Not found.");
break;
}
}
protected void processScrollSearchRequest(final HttpServletRequest request, final HttpServletResponse response, final FilterChain chain) {
final SearchService searchService = ComponentUtil.getComponent(SearchService.class);
final FessConfig fessConfig = ComponentUtil.getFessConfig();
if (!fessConfig.isApiSearchScroll()) {
writeJsonResponse(99, StringUtil.EMPTY, "Scroll Search is not available.");
return;
}
final StringBuilder buf = new StringBuilder(1000);
request.setAttribute(Constants.SEARCH_LOG_ACCESS_TYPE, Constants.SEARCH_LOG_ACCESS_TYPE_JSON);
final JsonRequestParams params = new JsonRequestParams(request, fessConfig);
try {
response.setContentType("application/x-ndjson; charset=UTF-8");
final long count =
searchService.scrollSearch(params, doc -> {
buf.setLength(0);
buf.append('{');
boolean first2 = true;
for (final Map.Entry<String, Object> entry : doc.entrySet()) {
final String name = entry.getKey();
if (StringUtil.isNotBlank(name) && entry.getValue() != null
&& ComponentUtil.getQueryHelper().isApiResponseField(name)) {
if (!first2) {
buf.append(',');
} else {
first2 = false;
}
buf.append(escapeJson(name));
buf.append(':');
buf.append(escapeJson(entry.getValue()));
}
}
buf.append('}');
buf.append('\n');
try {
response.getWriter().print(buf.toString());
} catch (IOException e) {
throw new IORuntimeException(e);
}
return true;
}, OptionalThing.empty());
response.flushBuffer();
if (logger.isDebugEnabled()) {
logger.debug("Loaded " + count + " docs");
}
} catch (Exception e) {
int status = 9;
if (logger.isDebugEnabled()) {
logger.debug("Failed to process a ping request.", e);
}
writeJsonResponse(status, null, e);
}
}
protected void processPingRequest(final HttpServletRequest request, final HttpServletResponse response, final FilterChain chain) {
final FessEsClient fessEsClient = ComponentUtil.getFessEsClient();
int status;

View file

@ -16,7 +16,9 @@
package org.codelibs.fess.app.service;
import java.text.NumberFormat;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
@ -24,6 +26,8 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
@ -53,6 +57,7 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.lastaflute.taglib.function.LaFunctions;
@ -173,7 +178,50 @@ public class SearchService {
}
public int deleteByQuery(final HttpServletRequest request, final SearchRequestParams params) {
public long scrollSearch(final SearchRequestParams params, final Function<Map<String, Object>, Boolean> cursor,
final OptionalThing<FessUserBean> userBean) {
final FessConfig fessConfig = ComponentUtil.getFessConfig();
LaRequestUtil.getOptionalRequest().ifPresent(request -> {
request.setAttribute(Constants.REQUEST_LANGUAGES, params.getLanguages());
request.setAttribute(Constants.REQUEST_QUERIES, params.getQuery());
});
final String query =
QueryStringBuilder.query(params.getQuery()).extraQueries(params.getExtraQueries()).fields(params.getFields()).build();
final int pageSize = params.getPageSize();
final String sortField = params.getSort();
return fessEsClient.<Map<String, Object>> scrollSearch(
fessConfig.getIndexDocumentSearchIndex(),
fessConfig.getIndexDocumentType(),
searchRequestBuilder -> {
fessConfig.processSearchPreference(searchRequestBuilder, userBean);
return SearchConditionBuilder.builder(searchRequestBuilder)
.query(StringUtil.isBlank(sortField) ? query : query + " sort:" + sortField).size(pageSize)
.responseFields(queryHelper.getResponseFields()).searchRequestType(params.getType()).build();
},
(searchResponse, hit) -> {
final Map<String, Object> source = hit.getSourceAsMap();
if (source != null) {
final Map<String, Object> docMap = new HashMap<>(source);
docMap.put(fessConfig.getIndexFieldId(), hit.getId());
docMap.put(fessConfig.getIndexFieldVersion(), hit.getVersion());
return docMap;
}
final Map<String, DocumentField> fields = hit.getFields();
if (fields != null) {
final Map<String, Object> docMap =
fields.entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey(), e -> (Object) e.getValue().getValues()));
docMap.put(fessConfig.getIndexFieldId(), hit.getId());
docMap.put(fessConfig.getIndexFieldVersion(), hit.getVersion());
return docMap;
}
return Collections.emptyMap();
}, cursor);
}
public long deleteByQuery(final HttpServletRequest request, final SearchRequestParams params) {
final String query =
QueryStringBuilder.query(params.getQuery()).extraQueries(params.getExtraQueries()).fields(params.getFields()).build();

View file

@ -209,7 +209,7 @@ public class ApiAdminSearchlistAction extends FessApiAdminAction {
throwValidationErrorApi(messages -> messages.addErrorsInvalidQueryUnknown(GLOBAL));
}
try {
final int count = searchService.deleteByQuery(request, body);
final long count = searchService.deleteByQuery(request, body);
return asJson(new ApiDeleteResponse().count(count).status(Status.OK).result());
} catch (final InvalidQueryException e) {
if (logger.isDebugEnabled()) {

View file

@ -31,6 +31,7 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@ -177,6 +178,8 @@ public class FessEsClient implements Client {
protected Map<String, List<String>> configListMap = new HashMap<>();
protected String scrollForSearch = "1m";
protected int sizeForDelete = 100;
protected String scrollForDelete = "1m";
@ -630,7 +633,7 @@ public class FessEsClient implements Client {
}
}
public int deleteByQuery(final String index, final String type, final QueryBuilder queryBuilder) {
public long deleteByQuery(final String index, final String type, final QueryBuilder queryBuilder) {
final FessConfig fessConfig = ComponentUtil.getFessConfig();
SearchResponse response =
@ -652,8 +655,8 @@ public class FessEsClient implements Client {
final BulkRequestBuilder bulkRequest = client.prepareBulk();
for (final SearchHit hit : hits) {
bulkRequest.add(client.prepareDelete(index, type, hit.getId()));
count++;
}
count += hits.length;
final BulkResponse bulkResponse = bulkRequest.execute().actionGet(fessConfig.getIndexBulkTimeout());
if (bulkResponse.hasFailures()) {
throw new IllegalBehaviorStateException(bulkResponse.buildFailureMessage());
@ -709,6 +712,51 @@ public class FessEsClient implements Client {
return searchResult.build(searchRequestBuilder, execTime, OptionalEntity.ofNullable(searchResponse, () -> {}));
}
public <T> long scrollSearch(final String index, final String type, final SearchCondition<SearchRequestBuilder> condition,
final EntityCreator<T, SearchResponse, SearchHit> creator, final Function<T, Boolean> cursor) {
long count = 0;
final SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index).setTypes(type).setScroll(scrollForSearch);
if (condition.build(searchRequestBuilder)) {
final FessConfig fessConfig = ComponentUtil.getFessConfig();
try {
if (logger.isDebugEnabled()) {
logger.debug("Query DSL:\n" + searchRequestBuilder.toString());
}
SearchResponse response = searchRequestBuilder.execute().actionGet(ComponentUtil.getFessConfig().getIndexSearchTimeout());
String scrollId = response.getScrollId();
while (scrollId != null) {
final SearchHits searchHits = response.getHits();
final SearchHit[] hits = searchHits.getHits();
if (hits.length == 0) {
scrollId = null;
break;
}
for (final SearchHit hit : hits) {
count++;
if (!cursor.apply(creator.build(response, hit))) {
scrollId = null;
break;
}
}
response =
client.prepareSearchScroll(scrollId).setScroll(scrollForDelete).execute()
.actionGet(fessConfig.getIndexBulkTimeout());
scrollId = response.getScrollId();
}
} catch (final SearchPhaseExecutionException e) {
throw new InvalidQueryException(messages -> messages.addErrorsInvalidQueryParseError(UserMessages.GLOBAL_PROPERTY_KEY),
"Invalid query: " + searchRequestBuilder, e);
}
}
return count;
}
public OptionalEntity<Map<String, Object>> getDocument(final String index, final String type,
final SearchCondition<SearchRequestBuilder> condition) {
return getDocument(
@ -1378,6 +1426,10 @@ public class FessEsClient implements Client {
this.scrollForDelete = scrollForDelete;
}
public void setScrollForSearch(String scrollForSearch) {
this.scrollForSearch = scrollForSearch;
}
@Override
public Client filterWithHeader(final Map<String, String> headers) {
return client.filterWithHeader(headers);

View file

@ -266,7 +266,7 @@ public class DataIndexHelper {
final FessEsClient fessEsClient = ComponentUtil.getFessEsClient();
final String index = fessConfig.getIndexDocumentUpdateIndex();
fessEsClient.admin().indices().prepareRefresh(index).execute().actionGet();
final int numOfDeleted = fessEsClient.deleteByQuery(index, fessConfig.getIndexDocumentType(), queryBuilder);
final long numOfDeleted = fessEsClient.deleteByQuery(index, fessConfig.getIndexDocumentType(), queryBuilder);
logger.info("Deleted {} old docs.", numOfDeleted);
} catch (final Exception e) {
logger.error("Could not delete old docs at " + dataConfig, e);

View file

@ -140,13 +140,13 @@ public class IndexingHelper {
return fessEsClient.delete(fessConfig.getIndexDocumentUpdateIndex(), fessConfig.getIndexDocumentType(), id, 0);
}
public int deleteDocumentByUrl(final FessEsClient fessEsClient, final String url) {
public long deleteDocumentByUrl(final FessEsClient fessEsClient, final String url) {
final FessConfig fessConfig = ComponentUtil.getFessConfig();
return fessEsClient.deleteByQuery(fessConfig.getIndexDocumentUpdateIndex(), fessConfig.getIndexDocumentType(),
QueryBuilders.termQuery(fessConfig.getIndexFieldUrl(), url));
}
public int deleteDocumentsByDocId(final FessEsClient fessEsClient, final List<String> docIdList) {
public long deleteDocumentsByDocId(final FessEsClient fessEsClient, final List<String> docIdList) {
final FessConfig fessConfig = ComponentUtil.getFessConfig();
return fessEsClient.deleteByQuery(fessConfig.getIndexDocumentUpdateIndex(), fessConfig.getIndexDocumentType(), QueryBuilders
.idsQuery(fessConfig.getIndexDocumentType()).addIds(docIdList.stream().toArray(n -> new String[n])));

View file

@ -160,6 +160,9 @@ public interface FessConfig extends FessEnv, org.codelibs.fess.mylasta.direction
/** The key of the configuration. e.g. */
String API_SEARCH_ACCEPT_REFERERS = "api.search.accept.referers";
/** The key of the configuration. e.g. false */
String API_SEARCH_SCROLL = "api.search.scroll";
/** The key of the configuration. e.g. */
String VIRTUAL_HOST_HEADERS = "virtual.host.headers";
@ -1648,6 +1651,20 @@ public interface FessConfig extends FessEnv, org.codelibs.fess.mylasta.direction
*/
Integer getApiSearchAcceptReferersAsInteger();
/**
* Get the value for the key 'api.search.scroll'. <br>
* The value is, e.g. false <br>
* @return The value of found property. (NotNull: if not found, exception but basically no way)
*/
String getApiSearchScroll();
/**
* Is the property for the key 'api.search.scroll' true? <br>
* The value is, e.g. false <br>
* @return The determination, true or false. (if not found, exception but basically no way)
*/
boolean isApiSearchScroll();
/**
* Get the value for the key 'virtual.host.headers'. <br>
* The value is, e.g. <br>
@ -5688,6 +5705,14 @@ public interface FessConfig extends FessEnv, org.codelibs.fess.mylasta.direction
return getAsInteger(FessConfig.API_SEARCH_ACCEPT_REFERERS);
}
public String getApiSearchScroll() {
return get(FessConfig.API_SEARCH_SCROLL);
}
public boolean isApiSearchScroll() {
return is(FessConfig.API_SEARCH_SCROLL);
}
public String getVirtualHostHeaders() {
return get(FessConfig.VIRTUAL_HOST_HEADERS);
}
@ -7813,6 +7838,7 @@ public interface FessConfig extends FessEnv, org.codelibs.fess.mylasta.direction
defaultMap.put(FessConfig.API_ACCESS_TOKEN_REQUEST_PARAMETER, "");
defaultMap.put(FessConfig.API_ADMIN_ACCESS_PERMISSIONS, "Radmin-api");
defaultMap.put(FessConfig.API_SEARCH_ACCEPT_REFERERS, "");
defaultMap.put(FessConfig.API_SEARCH_SCROLL, "false");
defaultMap.put(FessConfig.VIRTUAL_HOST_HEADERS, "");
defaultMap.put(FessConfig.HTTP_PROXY_HOST, "");
defaultMap.put(FessConfig.HTTP_PROXY_PORT, "8080");

View file

@ -99,6 +99,7 @@ api.access.token.required=false
api.access.token.request.parameter=
api.admin.access.permissions=Radmin-api
api.search.accept.referers=
api.search.scroll=false
# Virtual Host: Host:fess.codelibs.org=fess
virtual.host.headers=