|
@@ -18,6 +18,8 @@ package org.codelibs.fess.ds.callback;
|
|
import static org.codelibs.core.stream.StreamUtil.stream;
|
|
import static org.codelibs.core.stream.StreamUtil.stream;
|
|
|
|
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
|
|
+import java.util.Deque;
|
|
|
|
+import java.util.LinkedList;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
import java.util.concurrent.ExecutorService;
|
|
import java.util.concurrent.ExecutorService;
|
|
@@ -48,6 +50,7 @@ import org.codelibs.fess.exception.DataStoreCrawlingException;
|
|
import org.codelibs.fess.helper.IndexingHelper;
|
|
import org.codelibs.fess.helper.IndexingHelper;
|
|
import org.codelibs.fess.mylasta.direction.FessConfig;
|
|
import org.codelibs.fess.mylasta.direction.FessConfig;
|
|
import org.codelibs.fess.util.ComponentUtil;
|
|
import org.codelibs.fess.util.ComponentUtil;
|
|
|
|
+import org.elasticsearch.index.query.QueryBuilders;
|
|
import org.lastaflute.di.core.SingletonLaContainer;
|
|
import org.lastaflute.di.core.SingletonLaContainer;
|
|
|
|
|
|
public class FileListIndexUpdateCallbackImpl implements IndexUpdateCallback {
|
|
public class FileListIndexUpdateCallbackImpl implements IndexUpdateCallback {
|
|
@@ -59,9 +62,9 @@ public class FileListIndexUpdateCallbackImpl implements IndexUpdateCallback {
|
|
|
|
|
|
protected List<String> deleteUrlList = new ArrayList<>(100);
|
|
protected List<String> deleteUrlList = new ArrayList<>(100);
|
|
|
|
|
|
- protected int maxDeleteDocumentCacheSize = 100;
|
|
|
|
|
|
+ protected int maxDeleteDocumentCacheSize;
|
|
|
|
|
|
- protected int maxRedirectCount = 10;
|
|
|
|
|
|
+ protected int maxRedirectCount;
|
|
|
|
|
|
private final ExecutorService executor;
|
|
private final ExecutorService executor;
|
|
|
|
|
|
@@ -72,6 +75,9 @@ public class FileListIndexUpdateCallbackImpl implements IndexUpdateCallback {
|
|
this.indexUpdateCallback = indexUpdateCallback;
|
|
this.indexUpdateCallback = indexUpdateCallback;
|
|
this.crawlerClientFactory = crawlerClientFactory;
|
|
this.crawlerClientFactory = crawlerClientFactory;
|
|
executor = newFixedThreadPool(nThreads < 1 ? 1 : nThreads);
|
|
executor = newFixedThreadPool(nThreads < 1 ? 1 : nThreads);
|
|
|
|
+ final FessConfig fessConfig = ComponentUtil.getFessConfig();
|
|
|
|
+ maxDeleteDocumentCacheSize = fessConfig.getIndexerDataMaxDeleteCacheSizeAsInteger();
|
|
|
|
+ maxRedirectCount = fessConfig.getIndexerDataMaxRedirectCountAsInteger();
|
|
}
|
|
}
|
|
|
|
|
|
protected ExecutorService newFixedThreadPool(final int nThreads) {
|
|
protected ExecutorService newFixedThreadPool(final int nThreads) {
|
|
@@ -119,17 +125,56 @@ public class FileListIndexUpdateCallbackImpl implements IndexUpdateCallback {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
|
|
|
|
- String processingUrl = url;
|
|
|
|
- for (int i = 0; i < maxRedirectCount; i++) {
|
|
|
|
- processingUrl = processRequest(paramMap, dataMap, processingUrl, client);
|
|
|
|
- if (processingUrl == null) {
|
|
|
|
- break;
|
|
|
|
|
|
+ final long maxAccessCount = getMaxAccessCount(paramMap, dataMap);
|
|
|
|
+ long counter = 0;
|
|
|
|
+ final Deque<String> urlQueue = new LinkedList<>();
|
|
|
|
+ urlQueue.offer(url);
|
|
|
|
+ while (!urlQueue.isEmpty() && (maxAccessCount < 0 || counter < maxAccessCount)) {
|
|
|
|
+ String processingUrl = urlQueue.poll();
|
|
|
|
+ if (deleteUrlList.contains(processingUrl)) {
|
|
|
|
+ deleteDocuments(); // delete before indexing
|
|
|
|
+ }
|
|
|
|
+ try {
|
|
|
|
+ for (int i = 0; i < maxRedirectCount; i++) {
|
|
|
|
+ processingUrl = processRequest(paramMap, dataMap, processingUrl, client);
|
|
|
|
+ if (processingUrl == null) {
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ counter++;
|
|
|
|
+ dataMap.put(fessConfig.getIndexFieldUrl(), processingUrl);
|
|
|
|
+ }
|
|
|
|
+ } catch (ChildUrlsException e) {
|
|
|
|
+ e.getChildUrlList().stream().map(req -> req.getUrl()).forEach(urlQueue::offer);
|
|
|
|
+ } catch (DataStoreCrawlingException e) {
|
|
|
|
+ Throwable cause = e.getCause();
|
|
|
|
+ if (cause instanceof ChildUrlsException) {
|
|
|
|
+ ((ChildUrlsException) cause).getChildUrlList().stream().map(RequestData::getUrl).forEach(urlQueue::offer);
|
|
|
|
+ } else {
|
|
|
|
+ if (maxAccessCount != 1L) {
|
|
|
|
+ throw e;
|
|
|
|
+ } else {
|
|
|
|
+ logger.warn("Failed to access {}.", processingUrl, e);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- dataMap.put(fessConfig.getIndexFieldUrl(), processingUrl);
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ protected long getMaxAccessCount(final Map<String, String> paramMap, final Map<String, Object> dataMap) {
|
|
|
|
+ final Object recursive = dataMap.remove(getParamValue(paramMap, "field.recursive", "recursive"));
|
|
|
|
+ if (recursive == null || Constants.FALSE.equalsIgnoreCase(recursive.toString())) {
|
|
|
|
+ return 1L;
|
|
|
|
+ } else if (Constants.TRUE.equalsIgnoreCase(recursive.toString())) {
|
|
|
|
+ return -1L;
|
|
|
|
+ }
|
|
|
|
+ try {
|
|
|
|
+ return Long.parseLong(recursive.toString());
|
|
|
|
+ } catch (NumberFormatException e) {
|
|
|
|
+ return 1L;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
protected String processRequest(final Map<String, String> paramMap, final Map<String, Object> dataMap, final String url,
|
|
protected String processRequest(final Map<String, String> paramMap, final Map<String, Object> dataMap, final String url,
|
|
final CrawlerClient client) {
|
|
final CrawlerClient client) {
|
|
final long startTime = System.currentTimeMillis();
|
|
final long startTime = System.currentTimeMillis();
|
|
@@ -176,8 +221,8 @@ public class FileListIndexUpdateCallbackImpl implements IndexUpdateCallback {
|
|
|
|
|
|
indexUpdateCallback.store(paramMap, dataMap);
|
|
indexUpdateCallback.store(paramMap, dataMap);
|
|
} else {
|
|
} else {
|
|
- logger.warn("The response processor is not DefaultResponseProcessor. responseProcessor: " + responseProcessor
|
|
|
|
- + ", Data: " + dataMap);
|
|
|
|
|
|
+ logger.warn("The response processor is not DefaultResponseProcessor. responseProcessor: {}, Data: {}",
|
|
|
|
+ responseProcessor, dataMap);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return null;
|
|
return null;
|
|
@@ -204,12 +249,23 @@ public class FileListIndexUpdateCallbackImpl implements IndexUpdateCallback {
|
|
}
|
|
}
|
|
|
|
|
|
synchronized (indexUpdateCallback) {
|
|
synchronized (indexUpdateCallback) {
|
|
- deleteUrlList.add(dataMap.get(fessConfig.getIndexFieldUrl()).toString());
|
|
|
|
|
|
+ final long maxAccessCount = getMaxAccessCount(paramMap, dataMap);
|
|
|
|
+ final String url = dataMap.get(fessConfig.getIndexFieldUrl()).toString();
|
|
|
|
+ if (maxAccessCount != 1L) {
|
|
|
|
+ final FessEsClient fessEsClient = ComponentUtil.getFessEsClient();
|
|
|
|
+ final IndexingHelper indexingHelper = ComponentUtil.getIndexingHelper();
|
|
|
|
+ final long count =
|
|
|
|
+ indexingHelper.deleteDocumentByQuery(fessEsClient, QueryBuilders.prefixQuery(fessConfig.getIndexFieldUrl(), url));
|
|
|
|
+ if (logger.isDebugEnabled()) {
|
|
|
|
+ logger.debug("Deleted {} docs for {}*", count, url);
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ deleteUrlList.add(url);
|
|
|
|
|
|
- if (deleteUrlList.size() >= maxDeleteDocumentCacheSize) {
|
|
|
|
- deleteDocuments();
|
|
|
|
|
|
+ if (deleteUrlList.size() >= maxDeleteDocumentCacheSize) {
|
|
|
|
+ deleteDocuments();
|
|
|
|
+ }
|
|
}
|
|
}
|
|
-
|
|
|
|
}
|
|
}
|
|
return true;
|
|
return true;
|
|
}
|
|
}
|