|
@@ -18,6 +18,8 @@ package org.codelibs.fess.ds.impl;
|
|
|
import java.util.Map;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
|
+import javax.annotation.PostConstruct;
|
|
|
+
|
|
|
import org.codelibs.fess.ds.IndexUpdateCallback;
|
|
|
import org.codelibs.fess.es.client.FessEsClient;
|
|
|
import org.codelibs.fess.exception.FessSystemException;
|
|
@@ -28,23 +30,31 @@ import org.codelibs.fess.helper.SystemHelper;
|
|
|
import org.codelibs.fess.mylasta.direction.FessConfig;
|
|
|
import org.codelibs.fess.util.ComponentUtil;
|
|
|
import org.codelibs.fess.util.DocList;
|
|
|
+import org.codelibs.fess.util.DocumentUtil;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
public class IndexUpdateCallbackImpl implements IndexUpdateCallback {
|
|
|
private static final Logger logger = LoggerFactory.getLogger(IndexUpdateCallbackImpl.class);
|
|
|
|
|
|
- protected volatile AtomicLong documentSize = new AtomicLong(0);
|
|
|
+ protected AtomicLong documentSize = new AtomicLong(0);
|
|
|
|
|
|
protected volatile long executeTime = 0;
|
|
|
|
|
|
- final DocList docList = new DocList();
|
|
|
+ protected final DocList docList = new DocList();
|
|
|
+
|
|
|
+ protected long maxDocumentRequestSize;
|
|
|
+
|
|
|
+ @PostConstruct
|
|
|
+ public void init() {
|
|
|
+ maxDocumentRequestSize = ComponentUtil.getFessConfig().getIndexerWebfsMaxDocumentRequestSizeAsInteger().longValue();
|
|
|
+ }
|
|
|
|
|
|
/* (non-Javadoc)
|
|
|
* @see org.codelibs.fess.ds.impl.IndexUpdateCallback#store(java.util.Map)
|
|
|
*/
|
|
|
@Override
|
|
|
- public synchronized boolean store(final Map<String, String> paramMap, final Map<String, Object> dataMap) {
|
|
|
+ public boolean store(final Map<String, String> paramMap, final Map<String, Object> dataMap) {
|
|
|
final long startTime = System.currentTimeMillis();
|
|
|
final FessConfig fessConfig = ComponentUtil.getFessConfig();
|
|
|
final FessEsClient fessEsClient = ComponentUtil.getElasticsearchClient();
|
|
@@ -78,30 +88,41 @@ public class IndexUpdateCallbackImpl implements IndexUpdateCallback {
|
|
|
dataMap.put(fessConfig.getIndexFieldDocId(), systemHelper.generateDocId(dataMap));
|
|
|
}
|
|
|
|
|
|
- docList.add(dataMap);
|
|
|
- if (logger.isDebugEnabled()) {
|
|
|
- logger.debug("Added the document. " + "The number of a document cache is " + docList.size() + ".");
|
|
|
+ synchronized (docList) {
|
|
|
+ docList.add(dataMap);
|
|
|
+ if (logger.isDebugEnabled()) {
|
|
|
+ logger.debug("Added the document. " + "The number of a document cache is " + docList.size() + ".");
|
|
|
+ }
|
|
|
+
|
|
|
+ final Long contentLength = DocumentUtil.getValue(dataMap, fessConfig.getIndexFieldContentLength(), Long.class);
|
|
|
+ if (contentLength != null) {
|
|
|
+ docList.addContentSize(contentLength.longValue());
|
|
|
+ if (docList.getContentSize() >= maxDocumentRequestSize) {
|
|
|
+ indexingHelper.sendDocuments(fessEsClient, docList);
|
|
|
+ }
|
|
|
+ } else if (docList.size() >= fessConfig.getIndexerDataMaxDocumentCacheSizeAsInteger().intValue()) {
|
|
|
+ indexingHelper.sendDocuments(fessEsClient, docList);
|
|
|
+ }
|
|
|
+ executeTime += System.currentTimeMillis() - startTime;
|
|
|
}
|
|
|
|
|
|
- if (docList.size() >= fessConfig.getIndexerDataMaxDocumentCacheSizeAsInteger().intValue()) {
|
|
|
- indexingHelper.sendDocuments(fessEsClient, docList);
|
|
|
- }
|
|
|
documentSize.getAndIncrement();
|
|
|
|
|
|
if (logger.isDebugEnabled()) {
|
|
|
logger.debug("The number of an added document is " + documentSize.get() + ".");
|
|
|
}
|
|
|
|
|
|
- executeTime += System.currentTimeMillis() - startTime;
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void commit() {
|
|
|
- if (!docList.isEmpty()) {
|
|
|
- final IndexingHelper indexingHelper = ComponentUtil.getIndexingHelper();
|
|
|
- final FessEsClient fessEsClient = ComponentUtil.getElasticsearchClient();
|
|
|
- indexingHelper.sendDocuments(fessEsClient, docList);
|
|
|
+ synchronized (docList) {
|
|
|
+ if (!docList.isEmpty()) {
|
|
|
+ final IndexingHelper indexingHelper = ComponentUtil.getIndexingHelper();
|
|
|
+ final FessEsClient fessEsClient = ComponentUtil.getElasticsearchClient();
|
|
|
+ indexingHelper.sendDocuments(fessEsClient, docList);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|