fix #2833 Add bulk document indexing API to Admin API

This commit is contained in:
Shinsuke Sugaya 2024-07-24 17:54:24 +09:00
parent 93354ce179
commit 11beb70eb5
9 changed files with 258 additions and 68 deletions

View file

@ -271,7 +271,7 @@ public class AdminSearchlistAction extends FessAdminAction {
public HtmlResponse create(final CreateForm form) {
verifyCrudMode(form.crudMode, CrudMode.CREATE);
validate(form, messages -> {}, this::asEditHtml);
validateFields(form, v -> throwValidationError(v, this::asEditHtml));
validateFields(form.doc, v -> throwValidationError(v, this::asEditHtml));
verifyToken(this::asEditHtml);
getDoc(form).ifPresent(entity -> {
try {
@ -299,7 +299,7 @@ public class AdminSearchlistAction extends FessAdminAction {
public HtmlResponse update(final EditForm form) {
verifyCrudMode(form.crudMode, CrudMode.EDIT);
validate(form, messages -> {}, this::asEditHtml);
validateFields(form, v -> throwValidationError(v, this::asEditHtml));
validateFields(form.doc, v -> throwValidationError(v, this::asEditHtml));
verifyToken(this::asEditHtml);
getDoc(form).ifPresent(entity -> {
final String index = fessConfig.getIndexDocumentUpdateIndex();
@ -334,37 +334,37 @@ public class AdminSearchlistAction extends FessAdminAction {
// ===================================================================================
// Validation
// =========
public static void validateFields(final CreateForm form, final Consumer<VaMessenger<FessMessages>> throwError) {
public static void validateFields(final Map<String, Object> doc, final Consumer<VaMessenger<FessMessages>> throwError) {
final FessConfig fessConfig = ComponentUtil.getFessConfig();
try {
if (!fessConfig.validateIndexRequiredFields(form.doc)) {
throwError.accept(messages -> fessConfig.invalidIndexRequiredFields(form.doc).stream().map(s -> "doc." + s)
if (!fessConfig.validateIndexRequiredFields(doc)) {
throwError.accept(messages -> fessConfig.invalidIndexRequiredFields(doc).stream().map(s -> "doc." + s)
.forEach(s -> messages.addErrorsPropertyRequired(s, s)));
}
if (!fessConfig.validateIndexArrayFields(form.doc)) {
throwError.accept(messages -> fessConfig.invalidIndexArrayFields(form.doc).stream().map(s -> "doc." + s)
if (!fessConfig.validateIndexArrayFields(doc)) {
throwError.accept(messages -> fessConfig.invalidIndexArrayFields(doc).stream().map(s -> "doc." + s)
.forEach(s -> messages.addErrorsPropertyRequired(s, s)));
}
if (!fessConfig.validateIndexDateFields(form.doc)) {
throwError.accept(messages -> fessConfig.invalidIndexDateFields(form.doc).stream().map(s -> "doc." + s)
if (!fessConfig.validateIndexDateFields(doc)) {
throwError.accept(messages -> fessConfig.invalidIndexDateFields(doc).stream().map(s -> "doc." + s)
.forEach(s -> messages.addErrorsPropertyTypeDate(s, s)));
}
if (!fessConfig.validateIndexIntegerFields(form.doc)) {
throwError.accept(messages -> fessConfig.invalidIndexIntegerFields(form.doc).stream().map(s -> "doc." + s)
if (!fessConfig.validateIndexIntegerFields(doc)) {
throwError.accept(messages -> fessConfig.invalidIndexIntegerFields(doc).stream().map(s -> "doc." + s)
.forEach(s -> messages.addErrorsPropertyTypeInteger(s, s)));
}
if (!fessConfig.validateIndexLongFields(form.doc)) {
throwError.accept(messages -> fessConfig.invalidIndexLongFields(form.doc).stream().map(s -> "doc." + s)
if (!fessConfig.validateIndexLongFields(doc)) {
throwError.accept(messages -> fessConfig.invalidIndexLongFields(doc).stream().map(s -> "doc." + s)
.forEach(s -> messages.addErrorsPropertyTypeLong(s, s)));
}
if (!fessConfig.validateIndexFloatFields(form.doc)) {
throwError.accept(messages -> fessConfig.invalidIndexFloatFields(form.doc).stream().map(s -> "doc." + s)
if (!fessConfig.validateIndexFloatFields(doc)) {
throwError.accept(messages -> fessConfig.invalidIndexFloatFields(doc).stream().map(s -> "doc." + s)
.forEach(s -> messages.addErrorsPropertyTypeFloat(s, s)));
}
if (!fessConfig.validateIndexDoubleFields(form.doc)) {
throwError.accept(messages -> fessConfig.invalidIndexDoubleFields(form.doc).stream().map(s -> "doc." + s)
if (!fessConfig.validateIndexDoubleFields(doc)) {
throwError.accept(messages -> fessConfig.invalidIndexDoubleFields(doc).stream().map(s -> "doc." + s)
.forEach(s -> messages.addErrorsPropertyTypeDouble(s, s)));
}
} catch (final Exception e) {

View file

@ -39,7 +39,7 @@ public class ApiResult {
}
public enum Status {
OK(0), BAD_REQUEST(1), SYSTEM_ERROR(2), UNAUTHORIZED(3);
OK(0), BAD_REQUEST(1), SYSTEM_ERROR(2), UNAUTHORIZED(3), FAILED(9);
private final int id;
@ -391,4 +391,18 @@ public class ApiResult {
return new ApiResult(this);
}
}
public static class ApiBulkResponse extends ApiResponse {
protected List<Map<String, Object>> items;
public ApiBulkResponse items(final List<Map<String, Object>> items) {
this.items = items;
return this;
}
@Override
public ApiResult result() {
return new ApiResult(this);
}
}
}

View file

@ -0,0 +1,110 @@
/*
* Copyright 2012-2024 CodeLibs Project and the Others.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language
* governing permissions and limitations under the License.
*/
package org.codelibs.fess.app.web.api.admin.documents;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Resource;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.codelibs.fess.app.web.admin.searchlist.AdminSearchlistAction;
import org.codelibs.fess.app.web.api.ApiResult;
import org.codelibs.fess.app.web.api.ApiResult.ApiBulkResponse;
import org.codelibs.fess.app.web.api.ApiResult.Status;
import org.codelibs.fess.app.web.api.admin.FessApiAdminAction;
import org.codelibs.fess.app.web.api.admin.searchlist.ApiAdminSearchlistAction;
import org.codelibs.fess.es.client.SearchEngineClient;
import org.codelibs.fess.helper.CrawlingConfigHelper;
import org.codelibs.fess.helper.CrawlingInfoHelper;
import org.codelibs.fess.thumbnail.ThumbnailManager;
import org.codelibs.fess.util.ComponentUtil;
import org.lastaflute.web.Execute;
import org.lastaflute.web.response.JsonResponse;
import org.opensearch.action.bulk.BulkResponse;
public class ApiAdminDocumentsAction extends FessApiAdminAction {
// ===================================================================================
// Constant
//
private static final Logger logger = LogManager.getLogger(ApiAdminSearchlistAction.class);
// ===================================================================================
// Attribute
// =========
@Resource
protected SearchEngineClient searchEngineClient;
// ===================================================================================
// Search Execute
//
// POST /api/admin/documents/bulk
@Execute
public JsonResponse<ApiResult> post$bulk(final BulkBody body) {
validateApi(body, messages -> {});
if (body.documents == null) {
throwValidationErrorApi(messages -> messages.addErrorsCrudFailedToCreateCrudTable(GLOBAL, "documents is required."));
}
if (body.documents.isEmpty()) {
throwValidationErrorApi(messages -> messages.addErrorsCrudFailedToCreateCrudTable(GLOBAL, "documents is empty."));
}
final String indexFieldId = fessConfig.getIndexFieldId();
final CrawlingInfoHelper crawlingInfoHelper = ComponentUtil.getCrawlingInfoHelper();
final List<Map<String, Object>> docList = body.documents.stream().map(doc -> {
AdminSearchlistAction.validateFields(doc, this::throwValidationErrorApi);
final Map<String, Object> newDoc = fessConfig.convertToStorableDoc(doc);
final String newId = crawlingInfoHelper.generateId(newDoc);
newDoc.put(indexFieldId, newId);
return newDoc;
}).toList();
if (fessConfig.isThumbnailCrawlerEnabled()) {
final ThumbnailManager thumbnailManager = ComponentUtil.getThumbnailManager();
final String thumbnailField = fessConfig.getIndexFieldThumbnail();
docList.stream().forEach(doc -> {
if (!thumbnailManager.offer(doc)) {
if (logger.isDebugEnabled()) {
logger.debug("Removing {}={} from doc[{}]", thumbnailField, doc.get(thumbnailField),
doc.get(fessConfig.getIndexFieldUrl()));
}
doc.remove(thumbnailField);
}
});
}
final CrawlingConfigHelper crawlingConfigHelper = ComponentUtil.getCrawlingConfigHelper();
final BulkResponse response = searchEngineClient.addAll(fessConfig.getIndexDocumentUpdateIndex(), docList, (doc, builder) -> {
if (doc.get(fessConfig.getIndexFieldConfigId()) instanceof final String configId) {
crawlingConfigHelper.getPipeline(configId).ifPresent(s -> builder.setPipeline(s));
}
});
return asJson(new ApiBulkResponse().items(Arrays.stream(response.getItems()).map(item -> {
final Map<String, Object> itemMap = new HashMap<>();
itemMap.put("status", item.status().name());
if (item.isFailed()) {
itemMap.put("message", item.getFailureMessage());
} else {
itemMap.put("id", item.getId());
}
return itemMap;
}).toList()).status(response.hasFailures() ? Status.FAILED : Status.OK).result());
}
}

View file

@ -0,0 +1,25 @@
/*
* Copyright 2012-2024 CodeLibs Project and the Others.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language
* governing permissions and limitations under the License.
*/
package org.codelibs.fess.app.web.api.admin.documents;
import java.util.List;
import java.util.Map;
public class BulkBody {
public List<Map<String, Object>> documents;
}

View file

@ -120,7 +120,7 @@ public class ApiAdminSearchlistAction extends FessApiAdminAction {
if (body.doc == null) {
throwValidationErrorApi(messages -> messages.addErrorsCrudFailedToCreateCrudTable(GLOBAL, "doc is required"));
}
validateFields(body, this::throwValidationErrorApi);
validateFields(body.doc, this::throwValidationErrorApi);
body.crudMode = CrudMode.CREATE;
final Map<String, Object> doc = getDoc(body).map(entity -> {
try {
@ -152,7 +152,7 @@ public class ApiAdminSearchlistAction extends FessApiAdminAction {
if (body.doc == null) {
throwValidationErrorApi(messages -> messages.addErrorsCrudFailedToCreateCrudTable(GLOBAL, "doc is required"));
}
validateFields(body, this::throwValidationErrorApi);
validateFields(body.doc, this::throwValidationErrorApi);
body.crudMode = CrudMode.EDIT;
final Map<String, Object> doc = getDoc(body).map(entity -> {
final String index = fessConfig.getIndexDocumentUpdateIndex();

View file

@ -87,7 +87,6 @@ import org.opensearch.OpenSearchException;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionType;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.DocWriteRequest.OpType;
import org.opensearch.action.DocWriteResponse.Result;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
@ -100,8 +99,6 @@ import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.opensearch.action.admin.indices.refresh.RefreshResponse;
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.opensearch.action.admin.indices.segments.PitSegmentsRequest;
import org.opensearch.action.bulk.BulkItemResponse;
import org.opensearch.action.bulk.BulkItemResponse.Failure;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkRequestBuilder;
import org.opensearch.action.bulk.BulkResponse;
@ -1186,7 +1183,7 @@ public class SearchEngineClient implements Client {
}
}
public String[] addAll(final String index, final List<Map<String, Object>> docList,
public BulkResponse addAll(final String index, final List<Map<String, Object>> docList,
final BiConsumer<Map<String, Object>, IndexRequestBuilder> options) {
final FessConfig fessConfig = ComponentUtil.getFessConfig();
final BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
@ -1196,26 +1193,7 @@ public class SearchEngineClient implements Client {
options.accept(doc, builder);
bulkRequestBuilder.add(builder);
}
final BulkResponse response = bulkRequestBuilder.execute().actionGet(ComponentUtil.getFessConfig().getIndexBulkTimeout());
if (response.hasFailures()) {
if (logger.isDebugEnabled()) {
final List<DocWriteRequest<?>> requests = bulkRequestBuilder.request().requests();
final BulkItemResponse[] items = response.getItems();
if (requests.size() == items.length) {
for (int i = 0; i < requests.size(); i++) {
final BulkItemResponse resp = items[i];
if (resp.isFailed() && resp.getFailure() != null) {
final DocWriteRequest<?> req = requests.get(i);
final Failure failure = resp.getFailure();
logger.debug("Failed Request: {}\n=>{}", req, failure.getMessage());
}
}
}
}
throw new SearchEngineClientException(response.buildFailureMessage());
}
return Arrays.stream(response.getItems()).map(BulkItemResponse::getId).toArray(n -> new String[n]);
return bulkRequestBuilder.execute().actionGet(ComponentUtil.getFessConfig().getIndexBulkTimeout());
}
public static class SearchConditionBuilder {

View file

@ -23,12 +23,16 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.TotalHits;
import org.codelibs.fess.es.client.SearchEngineClient;
import org.codelibs.fess.es.client.SearchEngineClientException;
import org.codelibs.fess.mylasta.direction.FessConfig;
import org.codelibs.fess.thumbnail.ThumbnailManager;
import org.codelibs.fess.util.ComponentUtil;
import org.codelibs.fess.util.DocList;
import org.codelibs.fess.util.MemoryUtil;
import org.opensearch.action.admin.indices.refresh.RefreshResponse;
import org.opensearch.action.bulk.BulkItemResponse;
import org.opensearch.action.bulk.BulkItemResponse.Failure;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
@ -72,10 +76,27 @@ public class IndexingHelper {
if (logger.isDebugEnabled()) {
logger.debug("Deleted {} old docs", deletedDocCount);
}
searchEngineClient.addAll(fessConfig.getIndexDocumentUpdateIndex(), docList, (doc, builder) -> {
final String configId = (String) doc.get(fessConfig.getIndexFieldConfigId());
crawlingConfigHelper.getPipeline(configId).ifPresent(s -> builder.setPipeline(s));
});
final BulkResponse response =
searchEngineClient.addAll(fessConfig.getIndexDocumentUpdateIndex(), docList, (doc, builder) -> {
final String configId = (String) doc.get(fessConfig.getIndexFieldConfigId());
crawlingConfigHelper.getPipeline(configId).ifPresent(s -> builder.setPipeline(s));
});
if (response.hasFailures()) {
if (logger.isDebugEnabled()) {
final BulkItemResponse[] items = response.getItems();
if (docList.size() == items.length) {
for (int i = 0; i < docList.size(); i++) {
final BulkItemResponse resp = items[i];
if (resp.isFailed() && resp.getFailure() != null) {
final Map<String, Object> req = docList.get(i);
final Failure failure = resp.getFailure();
logger.debug("Failed Request: {}\n=>{}", req, failure.getMessage());
}
}
}
}
throw new SearchEngineClientException(response.buildFailureMessage());
}
}
if (logger.isInfoEnabled()) {
if (docList.getContentSize() > 0) {

View file

@ -1362,9 +1362,14 @@ public interface FessProp {
default List<String> invalidIndexIntegerFields(final Map<String, Object> source) {
final IntegerTypeValidator integerValidator = new IntegerTypeValidator();
return split(getIndexAdminIntegerFields(), ",")
.get(stream -> stream.filter(StringUtil::isNotBlank).map(String::trim).filter(s -> isNonEmptyValue(source.get(s)))
.filter(s -> !integerValidator.isValid(source.get(s).toString(), null)).collect(Collectors.toList()));
return split(getIndexAdminIntegerFields(), ",").get(
stream -> stream.filter(StringUtil::isNotBlank).map(String::trim).filter(s -> isNonEmptyValue(source.get(s))).filter(s -> {
final Object obj = source.get(s);
if (obj instanceof Number) {
return false;
}
return !integerValidator.isValid(obj.toString(), null);
}).collect(Collectors.toList()));
}
String getIndexAdminLongFields();
@ -1386,9 +1391,14 @@ public interface FessProp {
default List<String> invalidIndexLongFields(final Map<String, Object> source) {
final LongTypeValidator longValidator = new LongTypeValidator();
return split(getIndexAdminLongFields(), ",")
.get(stream -> stream.filter(StringUtil::isNotBlank).map(String::trim).filter(s -> isNonEmptyValue(source.get(s)))
.filter(s -> !longValidator.isValid(source.get(s).toString(), null)).collect(Collectors.toList()));
return split(getIndexAdminLongFields(), ",").get(
stream -> stream.filter(StringUtil::isNotBlank).map(String::trim).filter(s -> isNonEmptyValue(source.get(s))).filter(s -> {
final Object obj = source.get(s);
if (obj instanceof Number) {
return false;
}
return !longValidator.isValid(obj.toString(), null);
}).collect(Collectors.toList()));
}
String getIndexAdminFloatFields();
@ -1410,9 +1420,13 @@ public interface FessProp {
default List<String> invalidIndexFloatFields(final Map<String, Object> source) {
final FloatTypeValidator floatValidator = new FloatTypeValidator();
return split(getIndexAdminFloatFields(), ",")
.get(stream -> stream.filter(StringUtil::isNotBlank).map(String::trim).filter(s -> isNonEmptyValue(source.get(s)))
.filter(s -> !floatValidator.isValid(source.get(s).toString(), null)).collect(Collectors.toList()));
return split(getIndexAdminFloatFields(), ",").get(stream -> stream.filter(StringUtil::isNotBlank).map(String::trim).filter(s -> {
final Object obj = source.get(s);
if (obj instanceof Number) {
return false;
}
return !floatValidator.isValid(obj.toString(), null);
}).collect(Collectors.toList()));
}
String getIndexAdminDoubleFields();
@ -1434,9 +1448,13 @@ public interface FessProp {
default List<String> invalidIndexDoubleFields(final Map<String, Object> source) {
final DoubleTypeValidator doubleValidator = new DoubleTypeValidator();
return split(getIndexAdminDoubleFields(), ",")
.get(stream -> stream.filter(StringUtil::isNotBlank).map(String::trim).filter(s -> isNonEmptyValue(source.get(s)))
.filter(s -> !doubleValidator.isValid(source.get(s).toString(), null)).collect(Collectors.toList()));
return split(getIndexAdminDoubleFields(), ",").get(stream -> stream.filter(StringUtil::isNotBlank).map(String::trim).filter(s -> {
final Object obj = source.get(s);
if (obj instanceof Number) {
return false;
}
return !doubleValidator.isValid(obj.toString(), null);
}).collect(Collectors.toList()));
}
default Map<String, Object> convertToEditableDoc(final Map<String, Object> source) {
@ -1488,19 +1506,41 @@ public interface FessProp {
final String key = e.getKey();
Object value = e.getValue();
if (arrayFieldSet.contains(key)) {
value = split(value.toString(), "\n")
.get(stream -> stream.filter(StringUtil::isNotBlank).map(String::trim).collect(Collectors.toList()));
if (value instanceof String[]) {
value = Arrays.stream((String[]) value).toList();
} else if (value instanceof List<?>) {
// nothing
} else {
value = split(value.toString(), "\n")
.get(stream -> stream.filter(StringUtil::isNotBlank).map(String::trim).collect(Collectors.toList()));
}
} else if (dateFieldSet.contains(key)) {
// TODO time zone
value = FessFunctions.parseDate(value.toString());
} else if (integerFieldSet.contains(key)) {
value = DfTypeUtil.toInteger(value.toString());
if (value instanceof Number num) {
value = num.intValue();
} else {
value = DfTypeUtil.toInteger(value.toString());
}
} else if (longFieldSet.contains(key)) {
value = DfTypeUtil.toLong(value.toString());
if (value instanceof Number num) {
value = num.longValue();
} else {
value = DfTypeUtil.toLong(value.toString());
}
} else if (floatFieldSet.contains(key)) {
value = DfTypeUtil.toFloat(value.toString());
if (value instanceof Number num) {
value = num.floatValue();
} else {
value = DfTypeUtil.toFloat(value.toString());
}
} else if (doubleFieldSet.contains(key)) {
value = DfTypeUtil.toDouble(value.toString());
if (value instanceof Number num) {
value = num.doubleValue();
} else {
value = DfTypeUtil.toDouble(value.toString());
}
}
return new Pair<>(key, value);
}).collect(Collectors.toMap(Pair::getFirst, Pair::getSecond));

View file

@ -33,6 +33,8 @@ import org.codelibs.fess.unit.UnitFessTestCase;
import org.codelibs.fess.util.ComponentUtil;
import org.codelibs.fess.util.DocList;
import org.dbflute.optional.OptionalEntity;
import org.opensearch.action.bulk.BulkItemResponse;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.index.IndexAction;
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.search.SearchAction;
@ -87,12 +89,12 @@ public class IndexingHelperTest extends UnitFessTestCase {
final List<Map<String, Object>> sentDocList = new ArrayList<>();
final SearchEngineClient client = new SearchEngineClient() {
@Override
public String[] addAll(final String index, final List<Map<String, Object>> docList,
public BulkResponse addAll(final String index, final List<Map<String, Object>> docList,
final BiConsumer<Map<String, Object>, IndexRequestBuilder> options) {
sentIndex.set(index);
docList.forEach(x -> options.accept(x, new IndexRequestBuilder(this, IndexAction.INSTANCE)));
sentDocList.addAll(docList);
return docList.stream().map(x -> (String) x.get("id")).toArray(n -> new String[n]);
return new BulkResponse(new BulkItemResponse[0], documentSizeByQuery);
}
@Override