|
@@ -28,14 +28,14 @@ import org.codelibs.fess.Constants;
|
|
|
import org.codelibs.fess.crawler.Crawler;
|
|
|
import org.codelibs.fess.crawler.entity.AccessResult;
|
|
|
import org.codelibs.fess.crawler.entity.AccessResultData;
|
|
|
-import org.codelibs.fess.crawler.entity.EsAccessResult;
|
|
|
-import org.codelibs.fess.crawler.entity.EsUrlQueue;
|
|
|
+import org.codelibs.fess.crawler.entity.OpenSearchAccessResult;
|
|
|
+import org.codelibs.fess.crawler.entity.OpenSearchUrlQueue;
|
|
|
import org.codelibs.fess.crawler.service.DataService;
|
|
|
import org.codelibs.fess.crawler.service.UrlFilterService;
|
|
|
import org.codelibs.fess.crawler.service.UrlQueueService;
|
|
|
-import org.codelibs.fess.crawler.service.impl.EsDataService;
|
|
|
+import org.codelibs.fess.crawler.service.impl.OpenSearchDataService;
|
|
|
import org.codelibs.fess.crawler.transformer.Transformer;
|
|
|
-import org.codelibs.fess.crawler.util.EsResultList;
|
|
|
+import org.codelibs.fess.crawler.util.OpenSearchResultList;
|
|
|
import org.codelibs.fess.exception.ContainerNotAvailableException;
|
|
|
import org.codelibs.fess.exception.FessSystemException;
|
|
|
import org.codelibs.fess.helper.IndexingHelper;
|
|
@@ -70,10 +70,10 @@ public class IndexUpdater extends Thread {
|
|
|
protected SearchEngineClient searchEngineClient;
|
|
|
|
|
|
@Resource
|
|
|
- protected DataService<EsAccessResult> dataService;
|
|
|
+ protected DataService<OpenSearchAccessResult> dataService;
|
|
|
|
|
|
@Resource
|
|
|
- protected UrlQueueService<EsUrlQueue> urlQueueService;
|
|
|
+ protected UrlQueueService<OpenSearchUrlQueue> urlQueueService;
|
|
|
|
|
|
@Resource
|
|
|
protected UrlFilterService urlFilterService;
|
|
@@ -176,17 +176,17 @@ public class IndexUpdater extends Thread {
|
|
|
try {
|
|
|
final Consumer<SearchRequestBuilder> cb = builder -> {
|
|
|
final QueryBuilder queryBuilder =
|
|
|
- QueryBuilders.boolQuery().filter(QueryBuilders.termsQuery(EsAccessResult.SESSION_ID, sessionIdList))
|
|
|
- .filter(QueryBuilders.termQuery(EsAccessResult.STATUS, org.codelibs.fess.crawler.Constants.OK_STATUS));
|
|
|
+ QueryBuilders.boolQuery().filter(QueryBuilders.termsQuery(OpenSearchAccessResult.SESSION_ID, sessionIdList)).filter(
|
|
|
+ QueryBuilders.termQuery(OpenSearchAccessResult.STATUS, org.codelibs.fess.crawler.Constants.OK_STATUS));
|
|
|
builder.setQuery(queryBuilder);
|
|
|
builder.setFrom(0);
|
|
|
final int maxDocumentCacheSize = fessConfig.getIndexerWebfsMaxDocumentCacheSizeAsInteger();
|
|
|
builder.setSize(maxDocumentCacheSize <= 0 ? 1 : maxDocumentCacheSize);
|
|
|
- builder.addSort(EsAccessResult.CREATE_TIME, SortOrder.ASC);
|
|
|
+ builder.addSort(OpenSearchAccessResult.CREATE_TIME, SortOrder.ASC);
|
|
|
};
|
|
|
|
|
|
final DocList docList = new DocList();
|
|
|
- final List<EsAccessResult> accessResultList = new ArrayList<>();
|
|
|
+ final List<OpenSearchAccessResult> accessResultList = new ArrayList<>();
|
|
|
|
|
|
long updateTime = systemHelper.getCurrentTimeAsLong();
|
|
|
int errorCount = 0;
|
|
@@ -219,13 +219,13 @@ public class IndexUpdater extends Thread {
|
|
|
|
|
|
updateTime = systemHelper.getCurrentTimeAsLong();
|
|
|
|
|
|
- List<EsAccessResult> arList = getAccessResultList(cb, cleanupTime);
|
|
|
+ List<OpenSearchAccessResult> arList = getAccessResultList(cb, cleanupTime);
|
|
|
if (arList.isEmpty()) {
|
|
|
emptyListCount++;
|
|
|
} else {
|
|
|
emptyListCount = 0; // reset
|
|
|
}
|
|
|
- long hitCount = ((EsResultList<EsAccessResult>) arList).getTotalHits();
|
|
|
+ long hitCount = ((OpenSearchResultList<OpenSearchAccessResult>) arList).getTotalHits();
|
|
|
while (hitCount > 0) {
|
|
|
if (arList.isEmpty()) {
|
|
|
ThreadUtil.sleep(fessConfig.getIndexerWebfsCommitMarginTimeAsInteger().longValue());
|
|
@@ -235,7 +235,7 @@ public class IndexUpdater extends Thread {
|
|
|
cleanupTime = cleanupAccessResults(accessResultList);
|
|
|
}
|
|
|
arList = getAccessResultList(cb, cleanupTime);
|
|
|
- hitCount = ((EsResultList<EsAccessResult>) arList).getTotalHits();
|
|
|
+ hitCount = ((OpenSearchResultList<OpenSearchAccessResult>) arList).getTotalHits();
|
|
|
}
|
|
|
if (!docList.isEmpty()) {
|
|
|
indexingHelper.sendDocuments(searchEngineClient, docList);
|
|
@@ -320,11 +320,11 @@ public class IndexUpdater extends Thread {
|
|
|
|
|
|
}
|
|
|
|
|
|
- private void processAccessResults(final DocList docList, final List<EsAccessResult> accessResultList,
|
|
|
- final List<EsAccessResult> arList) {
|
|
|
+ private void processAccessResults(final DocList docList, final List<OpenSearchAccessResult> accessResultList,
|
|
|
+ final List<OpenSearchAccessResult> arList) {
|
|
|
final FessConfig fessConfig = ComponentUtil.getFessConfig();
|
|
|
final long maxDocumentRequestSize = Long.parseLong(fessConfig.getIndexerWebfsMaxDocumentRequestSize());
|
|
|
- for (final EsAccessResult accessResult : arList) {
|
|
|
+ for (final OpenSearchAccessResult accessResult : arList) {
|
|
|
if (logger.isDebugEnabled()) {
|
|
|
logger.debug("Indexing {}", accessResult.getUrl());
|
|
|
}
|
|
@@ -395,7 +395,7 @@ public class IndexUpdater extends Thread {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private AccessResultData<?> getAccessResultData(final EsAccessResult accessResult) {
|
|
|
+ private AccessResultData<?> getAccessResultData(final OpenSearchAccessResult accessResult) {
|
|
|
try {
|
|
|
return accessResult.getAccessResultData();
|
|
|
} catch (final Exception e) {
|
|
@@ -483,7 +483,7 @@ public class IndexUpdater extends Thread {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private long cleanupAccessResults(final List<EsAccessResult> accessResultList) {
|
|
|
+ private long cleanupAccessResults(final List<OpenSearchAccessResult> accessResultList) {
|
|
|
if (!accessResultList.isEmpty()) {
|
|
|
final long execTime = systemHelper.getCurrentTimeAsLong();
|
|
|
final int size = accessResultList.size();
|
|
@@ -498,12 +498,12 @@ public class IndexUpdater extends Thread {
|
|
|
return -1;
|
|
|
}
|
|
|
|
|
|
- private List<EsAccessResult> getAccessResultList(final Consumer<SearchRequestBuilder> cb, final long cleanupTime) {
|
|
|
+ private List<OpenSearchAccessResult> getAccessResultList(final Consumer<SearchRequestBuilder> cb, final long cleanupTime) {
|
|
|
if (logger.isDebugEnabled()) {
|
|
|
logger.debug("Getting documents in IndexUpdater queue.");
|
|
|
}
|
|
|
final long execTime = systemHelper.getCurrentTimeAsLong();
|
|
|
- final List<EsAccessResult> arList = ((EsDataService) dataService).getAccessResultList(cb);
|
|
|
+ final List<OpenSearchAccessResult> arList = ((OpenSearchDataService) dataService).getAccessResultList(cb);
|
|
|
final FessConfig fessConfig = ComponentUtil.getFessConfig();
|
|
|
if (!arList.isEmpty()) {
|
|
|
final long commitMarginTime = fessConfig.getIndexerWebfsCommitMarginTimeAsInteger().longValue();
|
|
@@ -513,7 +513,7 @@ public class IndexUpdater extends Thread {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- final long totalHits = ((EsResultList<EsAccessResult>) arList).getTotalHits();
|
|
|
+ final long totalHits = ((OpenSearchResultList<OpenSearchAccessResult>) arList).getTotalHits();
|
|
|
if (logger.isInfoEnabled()) {
|
|
|
final StringBuilder buf = new StringBuilder(100);
|
|
|
buf.append("Processing ");
|