fix #2728 add usePipeline
This commit is contained in:
parent
18c5342482
commit
5fc859c1aa
3 changed files with 53 additions and 2 deletions
|
@ -73,6 +73,7 @@ import org.codelibs.fess.query.QueryFieldConfig;
|
|||
import org.codelibs.fess.util.BooleanFunction;
|
||||
import org.codelibs.fess.util.ComponentUtil;
|
||||
import org.codelibs.fess.util.DocMap;
|
||||
import org.codelibs.fess.util.SearchEngineUtil;
|
||||
import org.codelibs.fess.util.SystemUtil;
|
||||
import org.codelibs.opensearch.runner.OpenSearchRunner;
|
||||
import org.codelibs.opensearch.runner.OpenSearchRunner.Configs;
|
||||
|
@ -164,7 +165,10 @@ import org.opensearch.common.xcontent.XContentType;
|
|||
import org.opensearch.index.query.InnerHitBuilder;
|
||||
import org.opensearch.index.query.QueryBuilder;
|
||||
import org.opensearch.index.query.QueryBuilders;
|
||||
import org.opensearch.index.reindex.UpdateByQueryRequest;
|
||||
import org.opensearch.rest.RestStatus;
|
||||
import org.opensearch.script.Script;
|
||||
import org.opensearch.script.ScriptType;
|
||||
import org.opensearch.search.SearchHit;
|
||||
import org.opensearch.search.SearchHits;
|
||||
import org.opensearch.search.aggregations.AggregationBuilders;
|
||||
|
@ -220,6 +224,8 @@ public class SearchEngineClient implements Client {
|
|||
|
||||
protected final List<UnaryOperator<String>> docMappingRewriteRuleList = new ArrayList<>();
|
||||
|
||||
protected boolean usePipeline = false;
|
||||
|
||||
public void addIndexConfig(final String path) {
|
||||
indexConfigList.add(path);
|
||||
}
|
||||
|
@ -245,6 +251,10 @@ public class SearchEngineClient implements Client {
|
|||
return this.runner != null;
|
||||
}
|
||||
|
||||
public void usePipeline() {
|
||||
this.usePipeline = true;
|
||||
}
|
||||
|
||||
protected InetAddress getInetAddressByName(final String host) {
|
||||
try {
|
||||
return InetAddress.getByName(host);
|
||||
|
@ -1090,12 +1100,40 @@ public class SearchEngineClient implements Client {
|
|||
}
|
||||
|
||||
public boolean update(final String index, final String id, final String field, final Object value) {
|
||||
// Using ingest pipelines with doc_as_upsert is not supported.
|
||||
if (usePipeline) {
|
||||
return updateByIdWithScript(index, id, field, value);
|
||||
}
|
||||
try {
|
||||
final Result result = client.prepareUpdate().setIndex(index).setId(id).setDoc(field, value).execute()
|
||||
.actionGet(ComponentUtil.getFessConfig().getIndexIndexTimeout()).getResult();
|
||||
return result == Result.CREATED || result == Result.UPDATED;
|
||||
} catch (final OpenSearchException e) {
|
||||
throw new SearchEngineClientException("Failed to set " + value + " to " + field + " for doc " + id, e);
|
||||
throw new SearchEngineClientException("[" + index + "] Failed to set " + value + " to " + field + " for doc " + id, e);
|
||||
}
|
||||
}
|
||||
|
||||
protected boolean updateByIdWithScript(final String index, final String id, final String field, final Object value) {
|
||||
final FessConfig fessConfig = ComponentUtil.getFessConfig();
|
||||
final UpdateByQueryRequest request = new UpdateByQueryRequest(index).setQuery(QueryBuilders.idsQuery().addIds(id))
|
||||
.setScript(new Script(ScriptType.INLINE, "painless",
|
||||
"ctx._source[params.f]=params.v;" + ComponentUtil.getLanguageHelper().getReindexScriptSource(),
|
||||
Map.of("f", field, "v", value)));
|
||||
try {
|
||||
final String source = SearchEngineUtil.getXContentString(request, XContentType.JSON);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("update script by id: {}", source);
|
||||
}
|
||||
final String refresh = StringUtil.isNotBlank(fessConfig.getIndexReindexRefresh()) ? fessConfig.getIndexReindexRefresh() : null;
|
||||
try (CurlResponse response = ComponentUtil.getCurlHelper().post("/" + index + "/_update_by_query").param("refresh", refresh)
|
||||
.param("max_docs", "1").body(source).execute()) {
|
||||
if (response.getHttpStatusCode() == 200) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
} catch (final IOException e) {
|
||||
throw new SearchEngineClientException("[" + index + "] Failed to set " + value + " to " + field + " for doc " + id, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -23,9 +23,11 @@ import java.util.function.Function;
|
|||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.codelibs.fess.es.client.SearchEngineClient;
|
||||
import org.lastaflute.di.exception.IORuntimeException;
|
||||
import org.opensearch.common.xcontent.ToXContent;
|
||||
import org.opensearch.common.xcontent.XContentBuilder;
|
||||
import org.opensearch.common.xcontent.XContentFactory;
|
||||
import org.opensearch.common.xcontent.XContentHelper;
|
||||
import org.opensearch.common.xcontent.XContentType;
|
||||
import org.opensearch.search.SearchHit;
|
||||
|
||||
|
@ -58,6 +60,14 @@ public final class SearchEngineUtil {
|
|||
hit -> callback.apply(hit));
|
||||
}
|
||||
|
||||
public static String getXContentString(final ToXContent xContent, final XContentType xContentType) {
|
||||
try {
|
||||
return XContentHelper.toXContent(xContent, xContentType, ToXContent.EMPTY_PARAMS, false).utf8ToString();
|
||||
} catch (IOException e) {
|
||||
throw new IORuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public interface XContentBuilderCallback {
|
||||
XContentBuilder apply(XContentBuilder builder, ToXContent.Params params) throws IOException;
|
||||
}
|
||||
|
|
|
@ -6,8 +6,11 @@
|
|||
<property name="settings">
|
||||
{"http.cors.enabled":"true",
|
||||
"http.cors.allow-origin":"*",
|
||||
"node.name":"search_engine",
|
||||
"discovery.seed_hosts":"search_engine",
|
||||
"cluster.initial_cluster_manager_nodes":"search_engine",
|
||||
"node.roles":"cluster_manager,data,ingest,ml",
|
||||
"indices.breaker.total.limit":"100%",
|
||||
"discovery.type":"single-node",
|
||||
"action.auto_create_index":"-*"}
|
||||
</property>
|
||||
<!-- Dictionaries -->
|
||||
|
|
Loading…
Add table
Reference in a new issue