Browse Source

fix #475 : multithread crawling support

Shinsuke Sugaya 9 years ago
parent
commit
3d32dc05d5

+ 2 - 0
src/main/java/org/codelibs/fess/Constants.java

@@ -218,6 +218,8 @@ public class Constants extends CoreLibConstants {
 
     public static final String INDEXING_TARGET = "indexingTarget";
 
+    public static final String NUM_OF_THREADS = "numOfThreads";
+
     public static final String BASIC = "BASIC";
 
     public static final String DIGEST = "DIGEST";

+ 1 - 1
src/main/java/org/codelibs/fess/ds/IndexUpdateCallback.java

@@ -19,7 +19,7 @@ import java.util.Map;
 
 public interface IndexUpdateCallback {
 
-    boolean store(Map<String, String> paramMap, Map<String, Object> dataMap);
+    void store(Map<String, String> paramMap, Map<String, Object> dataMap);
 
     long getDocumentSize();
 

+ 8 - 4
src/main/java/org/codelibs/fess/ds/impl/CsvDataStoreImpl.java

@@ -34,10 +34,10 @@ import org.codelibs.fess.Constants;
 import org.codelibs.fess.app.service.FailureUrlService;
 import org.codelibs.fess.crawler.exception.CrawlingAccessException;
 import org.codelibs.fess.crawler.exception.MultipleCrawlingAccessException;
-import org.codelibs.fess.ds.DataStoreCrawlingException;
-import org.codelibs.fess.ds.DataStoreException;
 import org.codelibs.fess.ds.IndexUpdateCallback;
 import org.codelibs.fess.es.config.exentity.DataConfig;
+import org.codelibs.fess.exception.DataStoreCrawlingException;
+import org.codelibs.fess.exception.DataStoreException;
 import org.codelibs.fess.util.ComponentUtil;
 import org.codelibs.fess.util.StreamUtil;
 import org.slf4j.Logger;
@@ -236,7 +236,7 @@ public class CsvDataStoreImpl extends AbstractDataStoreImpl {
                 }
 
                 try {
-                    loop = callback.store(paramMap, dataMap);
+                    callback.store(paramMap, dataMap);
                 } catch (final CrawlingAccessException e) {
                     logger.warn("Crawling Access Exception at : " + dataMap, e);
 
@@ -258,7 +258,11 @@ public class CsvDataStoreImpl extends AbstractDataStoreImpl {
 
                     String url;
                     if (target instanceof DataStoreCrawlingException) {
-                        url = ((DataStoreCrawlingException) target).getUrl();
+                        DataStoreCrawlingException dce = (DataStoreCrawlingException) target;
+                        url = dce.getUrl();
+                        if (dce.aborted()) {
+                            loop = false;
+                        }
                     } else {
                         url = csvFile.getAbsolutePath() + ":" + csvReader.getLineNumber();
                     }

+ 17 - 6
src/main/java/org/codelibs/fess/ds/impl/CsvListDataStoreImpl.java

@@ -18,10 +18,11 @@ package org.codelibs.fess.ds.impl;
 import java.io.File;
 import java.util.Map;
 
+import org.codelibs.fess.Constants;
 import org.codelibs.fess.crawler.client.CrawlerClientFactory;
-import org.codelibs.fess.ds.DataStoreException;
 import org.codelibs.fess.ds.IndexUpdateCallback;
 import org.codelibs.fess.es.config.exentity.DataConfig;
+import org.codelibs.fess.exception.DataStoreException;
 import org.codelibs.fess.util.ComponentUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,13 +52,23 @@ public class CsvListDataStoreImpl extends CsvDataStoreImpl {
     @Override
     protected void storeData(final DataConfig dataConfig, final IndexUpdateCallback callback, final Map<String, String> paramMap,
             final Map<String, String> scriptMap, final Map<String, Object> defaultDataMap) {
-
+        int nThreads = 1;
+        if (paramMap.containsKey(Constants.NUM_OF_THREADS)) {
+            try {
+                nThreads = Integer.parseInt(paramMap.get(Constants.NUM_OF_THREADS));
+            } catch (NumberFormatException e) {
+                logger.warn(Constants.NUM_OF_THREADS + " is not int value.", e);
+            }
+        }
         final CrawlerClientFactory crawlerClientFactory = ComponentUtil.getCrawlerClientFactory();
         dataConfig.initializeClientFactory(crawlerClientFactory);
-        final FileListIndexUpdateCallbackImpl fileListIndexUpdateCallback =
-                new FileListIndexUpdateCallbackImpl(callback, crawlerClientFactory);
-        super.storeData(dataConfig, fileListIndexUpdateCallback, paramMap, scriptMap, defaultDataMap);
-        fileListIndexUpdateCallback.commit();
+        try (final FileListIndexUpdateCallbackImpl fileListIndexUpdateCallback =
+                new FileListIndexUpdateCallbackImpl(callback, crawlerClientFactory, nThreads)) {
+            super.storeData(dataConfig, fileListIndexUpdateCallback, paramMap, scriptMap, defaultDataMap);
+            fileListIndexUpdateCallback.commit();
+        } catch (Exception e) {
+            throw new DataStoreException(e);
+        }
     }
 
     @Override

+ 62 - 21
src/main/java/org/codelibs/fess/ds/impl/DatabaseDataStoreImpl.java

@@ -27,10 +27,15 @@ import java.util.Map;
 import java.util.Set;
 
 import org.codelibs.core.lang.StringUtil;
-import org.codelibs.fess.ds.DataStoreException;
+import org.codelibs.fess.app.service.FailureUrlService;
+import org.codelibs.fess.crawler.exception.CrawlingAccessException;
+import org.codelibs.fess.crawler.exception.MultipleCrawlingAccessException;
 import org.codelibs.fess.ds.IndexUpdateCallback;
 import org.codelibs.fess.es.config.exentity.DataConfig;
+import org.codelibs.fess.exception.DataStoreCrawlingException;
+import org.codelibs.fess.exception.DataStoreException;
 import org.codelibs.fess.exception.FessSystemException;
+import org.codelibs.fess.util.ComponentUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -85,19 +90,20 @@ public class DatabaseDataStoreImpl extends AbstractDataStoreImpl {
         Statement stmt = null;
         ResultSet rs = null;
         try {
-            Class.forName(getDriverClass(paramMap)); // TODO not needed on java6?
+            Class.forName(getDriverClass(paramMap));
 
-            final String url = getUrl(paramMap);
+            final String jdbcUrl = getUrl(paramMap);
             final String username = getUsername(paramMap);
             final String password = getPassword(paramMap);
             if (StringUtil.isNotEmpty(username)) {
-                con = DriverManager.getConnection(url, username, password);
+                con = DriverManager.getConnection(jdbcUrl, username, password);
             } else {
-                con = DriverManager.getConnection(url);
+                con = DriverManager.getConnection(jdbcUrl);
             }
 
+            final String sql = getSql(paramMap);
             stmt = con.createStatement();
-            rs = stmt.executeQuery(getSql(paramMap)); // SQL generated by an administrator
+            rs = stmt.executeQuery(sql); // SQL generated by an administrator
             boolean loop = true;
             while (rs.next() && loop && alive) {
                 final Map<String, Object> dataMap = new HashMap<String, Object>();
@@ -110,9 +116,44 @@ public class DatabaseDataStoreImpl extends AbstractDataStoreImpl {
                 }
 
                 try {
-                    loop = callback.store(paramMap, dataMap);
-                } catch (final Exception e) {
-                    logger.warn("Failed to store data: " + dataMap, e);
+                    callback.store(paramMap, dataMap);
+                } catch (final CrawlingAccessException e) {
+                    logger.warn("Crawling Access Exception at : " + dataMap, e);
+
+                    Throwable target = e;
+                    if (target instanceof MultipleCrawlingAccessException) {
+                        final Throwable[] causes = ((MultipleCrawlingAccessException) target).getCauses();
+                        if (causes.length > 0) {
+                            target = causes[causes.length - 1];
+                        }
+                    }
+
+                    String errorName;
+                    final Throwable cause = target.getCause();
+                    if (cause != null) {
+                        errorName = cause.getClass().getCanonicalName();
+                    } else {
+                        errorName = target.getClass().getCanonicalName();
+                    }
+
+                    String url;
+                    if (target instanceof DataStoreCrawlingException) {
+                        DataStoreCrawlingException dce = (DataStoreCrawlingException) target;
+                        url = dce.getUrl();
+                        if (dce.aborted()) {
+                            loop = false;
+                        }
+                    } else {
+                        url = sql + ":" + rs.getRow();
+                    }
+                    final FailureUrlService failureUrlService = ComponentUtil.getComponent(FailureUrlService.class);
+                    failureUrlService.store(config, errorName, url, target);
+                } catch (final Exception | OutOfMemoryError e) {
+                    final String url = sql + ":" + rs.getRow();
+                    final FailureUrlService failureUrlService = ComponentUtil.getComponent(FailureUrlService.class);
+                    failureUrlService.store(config, e.getClass().getCanonicalName(), url, e);
+
+                    logger.warn("Crawling Access Exception at : " + dataMap, e);
                 }
 
                 if (readInterval > 0) {
@@ -150,13 +191,13 @@ public class DatabaseDataStoreImpl extends AbstractDataStoreImpl {
     }
 
     protected Object convertValue(final String template, final ResultSet rs, final Map<String, String> paramMap) {
-        return convertValue(template, new ResultSetParamMap<String, String>(rs, paramMap));
+        return convertValue(template, new ResultSetParamMap(rs, paramMap));
     }
 
-    protected static class ResultSetParamMap<K, V> implements Map<K, V> {
-        private final Map<K, V> paramMap = new HashMap<K, V>();
+    protected static class ResultSetParamMap implements Map<String, String> {
+        private final Map<String, String> paramMap = new HashMap<>();
 
-        public ResultSetParamMap(final ResultSet resultSet, final Map<K, V> paramMap) {
+        public ResultSetParamMap(final ResultSet resultSet, final Map<String, String> paramMap) {
             this.paramMap.putAll(paramMap);
 
             try {
@@ -166,7 +207,7 @@ public class DatabaseDataStoreImpl extends AbstractDataStoreImpl {
                     try {
                         final String label = metaData.getColumnLabel(i + 1);
                         final String value = resultSet.getString(i + 1);
-                        this.paramMap.put((K) label, (V) value);
+                        this.paramMap.put(label, value);
                     } catch (final SQLException e) {
                         logger.warn("Failed to parse data in a result set. The column is " + (i + 1) + ".", e);
                     }
@@ -193,12 +234,12 @@ public class DatabaseDataStoreImpl extends AbstractDataStoreImpl {
         }
 
         @Override
-        public Set<java.util.Map.Entry<K, V>> entrySet() {
+        public Set<java.util.Map.Entry<String, String>> entrySet() {
             return paramMap.entrySet();
         }
 
         @Override
-        public V get(final Object key) {
+        public String get(final Object key) {
             return paramMap.get(key);
         }
 
@@ -208,22 +249,22 @@ public class DatabaseDataStoreImpl extends AbstractDataStoreImpl {
         }
 
         @Override
-        public Set<K> keySet() {
+        public Set<String> keySet() {
             return paramMap.keySet();
         }
 
         @Override
-        public V put(final K key, final V value) {
+        public String put(final String key, final String value) {
             return paramMap.put(key, value);
         }
 
         @Override
-        public void putAll(final Map<? extends K, ? extends V> m) {
+        public void putAll(final Map<? extends String, ? extends String> m) {
             paramMap.putAll(m);
         }
 
         @Override
-        public V remove(final Object key) {
+        public String remove(final Object key) {
             return paramMap.remove(key);
         }
 
@@ -233,7 +274,7 @@ public class DatabaseDataStoreImpl extends AbstractDataStoreImpl {
         }
 
         @Override
-        public Collection<V> values() {
+        public Collection<String> values() {
             return paramMap.values();
         }
 

+ 8 - 4
src/main/java/org/codelibs/fess/ds/impl/EsDataStoreImpl.java

@@ -26,10 +26,10 @@ import org.codelibs.fess.Constants;
 import org.codelibs.fess.app.service.FailureUrlService;
 import org.codelibs.fess.crawler.exception.CrawlingAccessException;
 import org.codelibs.fess.crawler.exception.MultipleCrawlingAccessException;
-import org.codelibs.fess.ds.DataStoreCrawlingException;
-import org.codelibs.fess.ds.DataStoreException;
 import org.codelibs.fess.ds.IndexUpdateCallback;
 import org.codelibs.fess.es.config.exentity.DataConfig;
+import org.codelibs.fess.exception.DataStoreCrawlingException;
+import org.codelibs.fess.exception.DataStoreException;
 import org.codelibs.fess.util.ComponentUtil;
 import org.codelibs.fess.util.StreamUtil;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
@@ -182,7 +182,7 @@ public class EsDataStoreImpl extends AbstractDataStoreImpl {
                     }
 
                     try {
-                        loop = callback.store(paramMap, dataMap);
+                        callback.store(paramMap, dataMap);
                     } catch (final CrawlingAccessException e) {
                         logger.warn("Crawling Access Exception at : " + dataMap, e);
 
@@ -204,7 +204,11 @@ public class EsDataStoreImpl extends AbstractDataStoreImpl {
 
                         String url;
                         if (target instanceof DataStoreCrawlingException) {
-                            url = ((DataStoreCrawlingException) target).getUrl();
+                            DataStoreCrawlingException dce = (DataStoreCrawlingException) target;
+                            url = dce.getUrl();
+                            if (dce.aborted()) {
+                                loop = false;
+                            }
                         } else {
                             url = hit.getIndex() + "/" + hit.getType() + "/" + hit.getId();
                         }

+ 20 - 5
src/main/java/org/codelibs/fess/ds/impl/EsListDataStoreImpl.java

@@ -17,23 +17,38 @@ package org.codelibs.fess.ds.impl;
 
 import java.util.Map;
 
+import org.codelibs.fess.Constants;
 import org.codelibs.fess.crawler.client.CrawlerClientFactory;
 import org.codelibs.fess.ds.IndexUpdateCallback;
 import org.codelibs.fess.es.config.exentity.DataConfig;
+import org.codelibs.fess.exception.DataStoreException;
 import org.codelibs.fess.util.ComponentUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class EsListDataStoreImpl extends EsDataStoreImpl {
+    private static final Logger logger = LoggerFactory.getLogger(EsListDataStoreImpl.class);
 
     @Override
     protected void storeData(final DataConfig dataConfig, final IndexUpdateCallback callback, final Map<String, String> paramMap,
             final Map<String, String> scriptMap, final Map<String, Object> defaultDataMap) {
-
+        int nThreads = 1;
+        if (paramMap.containsKey(Constants.NUM_OF_THREADS)) {
+            try {
+                nThreads = Integer.parseInt(paramMap.get(Constants.NUM_OF_THREADS));
+            } catch (NumberFormatException e) {
+                logger.warn(Constants.NUM_OF_THREADS + " is not int value.", e);
+            }
+        }
         final CrawlerClientFactory crawlerClientFactory = ComponentUtil.getCrawlerClientFactory();
         dataConfig.initializeClientFactory(crawlerClientFactory);
-        final FileListIndexUpdateCallbackImpl fileListIndexUpdateCallback =
-                new FileListIndexUpdateCallbackImpl(callback, crawlerClientFactory);
-        super.storeData(dataConfig, fileListIndexUpdateCallback, paramMap, scriptMap, defaultDataMap);
-        fileListIndexUpdateCallback.commit();
+        try (final FileListIndexUpdateCallbackImpl fileListIndexUpdateCallback =
+                new FileListIndexUpdateCallbackImpl(callback, crawlerClientFactory, nThreads)) {
+            super.storeData(dataConfig, fileListIndexUpdateCallback, paramMap, scriptMap, defaultDataMap);
+            fileListIndexUpdateCallback.commit();
+        } catch (Exception e) {
+            throw new DataStoreException(e);
+        }
     }
 
 }

+ 0 - 289
src/main/java/org/codelibs/fess/ds/impl/FileListDataStoreImpl.java

@@ -1,289 +0,0 @@
-/*
- * Copyright 2012-2016 CodeLibs Project and the Others.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language
- * governing permissions and limitations under the License.
- */
-package org.codelibs.fess.ds.impl;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.codelibs.core.collection.LruHashMap;
-import org.codelibs.core.io.SerializeUtil;
-import org.codelibs.fess.Constants;
-import org.codelibs.fess.crawler.builder.RequestDataBuilder;
-import org.codelibs.fess.crawler.client.CrawlerClient;
-import org.codelibs.fess.crawler.client.CrawlerClientFactory;
-import org.codelibs.fess.crawler.entity.ResponseData;
-import org.codelibs.fess.crawler.entity.ResultData;
-import org.codelibs.fess.crawler.exception.CrawlerSystemException;
-import org.codelibs.fess.crawler.processor.ResponseProcessor;
-import org.codelibs.fess.crawler.processor.impl.DefaultResponseProcessor;
-import org.codelibs.fess.crawler.rule.Rule;
-import org.codelibs.fess.crawler.rule.RuleManager;
-import org.codelibs.fess.crawler.transformer.Transformer;
-import org.codelibs.fess.ds.DataStoreCrawlingException;
-import org.codelibs.fess.ds.DataStoreException;
-import org.codelibs.fess.ds.IndexUpdateCallback;
-import org.codelibs.fess.es.client.FessEsClient;
-import org.codelibs.fess.es.config.exentity.DataConfig;
-import org.codelibs.fess.helper.CrawlingInfoHelper;
-import org.codelibs.fess.helper.IndexingHelper;
-import org.codelibs.fess.mylasta.direction.FessConfig;
-import org.codelibs.fess.util.ComponentUtil;
-import org.lastaflute.di.core.SingletonLaContainer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.orangesignal.csv.CsvConfig;
-
-@Deprecated
-// replace with CsvListDataStoreImpl
-public class FileListDataStoreImpl extends CsvDataStoreImpl {
-
-    private static final Logger logger = LoggerFactory.getLogger(FileListDataStoreImpl.class);
-
-    public boolean deleteProcessedFile = true;
-
-    public long csvFileTimestampMargin = 60 * 1000;// 1min
-
-    public boolean ignoreDataStoreException = true;
-
-    public String createEventName = "create";
-
-    public String modifyEventName = "modify";
-
-    public String deleteEventName = "delete";
-
-    public String eventTypeField = "event_type";
-
-    public int maxDeleteDocumentCacheSize = 100;
-
-    protected CrawlerClientFactory crawlerClientFactory;
-
-    protected CrawlingInfoHelper crawlingInfoHelper;
-
-    public Map<String, String> parentEncodingMap = Collections.synchronizedMap(new LruHashMap<>(1000));
-
-    public String[] ignoreFieldNames = new String[] { Constants.INDEXING_TARGET, Constants.SESSION_ID };
-
-    @Override
-    protected boolean isCsvFile(final File parentFile, final String filename) {
-        if (super.isCsvFile(parentFile, filename)) {
-            final File file = new File(parentFile, filename);
-            final long now = System.currentTimeMillis();
-            return now - file.lastModified() > csvFileTimestampMargin;
-        }
-        return false;
-    }
-
-    @Override
-    public void store(final DataConfig config, final IndexUpdateCallback callback, final Map<String, String> initParamMap) {
-
-        crawlerClientFactory = SingletonLaContainer.getComponent(CrawlerClientFactory.class);
-
-        config.initializeClientFactory(crawlerClientFactory);
-
-        super.store(config, callback, initParamMap);
-    }
-
-    @Override
-    protected void storeData(final DataConfig dataConfig, final IndexUpdateCallback callback, final Map<String, String> paramMap,
-            final Map<String, String> scriptMap, final Map<String, Object> defaultDataMap) {
-
-        final FileListIndexUpdateCallback fileListIndexUpdateCallback = new FileListIndexUpdateCallback(callback);
-        super.storeData(dataConfig, fileListIndexUpdateCallback, paramMap, scriptMap, defaultDataMap);
-        fileListIndexUpdateCallback.commit();
-    }
-
-    @Override
-    protected void processCsv(final DataConfig dataConfig, final IndexUpdateCallback callback, final Map<String, String> paramMap,
-            final Map<String, String> scriptMap, final Map<String, Object> defaultDataMap, final CsvConfig csvConfig, final File csvFile,
-            final long readInterval, final String csvFileEncoding, final boolean hasHeaderLine) {
-        try {
-            super.processCsv(dataConfig, callback, paramMap, scriptMap, defaultDataMap, csvConfig, csvFile, readInterval, csvFileEncoding,
-                    hasHeaderLine);
-
-            // delete csv file
-            if (deleteProcessedFile && !csvFile.delete()) {
-                logger.warn("Failed to delete {}", csvFile.getAbsolutePath());
-            }
-        } catch (final DataStoreException e) {
-            if (ignoreDataStoreException) {
-                logger.error("Failed to process " + csvFile.getAbsolutePath(), e);
-                // rename csv file, or delete it if failed
-                if (!csvFile.renameTo(new File(csvFile.getParent(), csvFile.getName() + ".txt")) && !csvFile.delete()) {
-                    logger.warn("Failed to delete {}", csvFile.getAbsolutePath());
-                }
-            } else {
-                throw e;
-            }
-        }
-    }
-
-    protected class FileListIndexUpdateCallback implements IndexUpdateCallback {
-        protected IndexUpdateCallback indexUpdateCallback;
-
-        protected List<String> deleteIdList = new ArrayList<String>();
-
-        protected FileListIndexUpdateCallback(final IndexUpdateCallback indexUpdateCallback) {
-            this.indexUpdateCallback = indexUpdateCallback;
-
-        }
-
-        @Override
-        public boolean store(final Map<String, String> paramMap, final Map<String, Object> dataMap) {
-            final Object eventType = dataMap.remove(eventTypeField);
-
-            if (createEventName.equals(eventType) || modifyEventName.equals(eventType)) {
-                // updated file
-                return addDocument(paramMap, dataMap);
-            } else if (deleteEventName.equals(eventType)) {
-                // deleted file
-                return deleteDocument(paramMap, dataMap);
-            }
-
-            logger.warn("unknown event: " + eventType + ", data: " + dataMap);
-            return false;
-        }
-
-        protected boolean addDocument(final Map<String, String> paramMap, final Map<String, Object> dataMap) {
-            final FessConfig fessConfig = ComponentUtil.getFessConfig();
-            synchronized (indexUpdateCallback) {
-                // required check
-                if (!dataMap.containsKey(fessConfig.getIndexFieldUrl()) || dataMap.get(fessConfig.getIndexFieldUrl()) == null) {
-                    logger.warn("Could not add a doc. Invalid data: " + dataMap);
-                    return false;
-                }
-
-                final String url = dataMap.get(fessConfig.getIndexFieldUrl()).toString();
-                try {
-                    final CrawlerClient client = crawlerClientFactory.getClient(url);
-                    if (client == null) {
-                        logger.warn("CrawlerClient is null. Data: " + dataMap);
-                        return false;
-                    }
-
-                    final long startTime = System.currentTimeMillis();
-                    final ResponseData responseData = client.execute(RequestDataBuilder.newRequestData().get().url(url).build());
-                    responseData.setExecutionTime(System.currentTimeMillis() - startTime);
-                    if (dataMap.containsKey(Constants.SESSION_ID)) {
-                        responseData.setSessionId((String) dataMap.get(Constants.SESSION_ID));
-                    } else {
-                        responseData.setSessionId((String) paramMap.get(Constants.CRAWLING_INFO_ID));
-                    }
-
-                    final RuleManager ruleManager = SingletonLaContainer.getComponent(RuleManager.class);
-                    final Rule rule = ruleManager.getRule(responseData);
-                    if (rule == null) {
-                        logger.warn("No url rule. Data: " + dataMap);
-                        return false;
-                    } else {
-                        responseData.setRuleId(rule.getRuleId());
-                        final ResponseProcessor responseProcessor = rule.getResponseProcessor();
-                        if (responseProcessor instanceof DefaultResponseProcessor) {
-                            final Transformer transformer = ((DefaultResponseProcessor) responseProcessor).getTransformer();
-                            final ResultData resultData = transformer.transform(responseData);
-                            final byte[] data = resultData.getData();
-                            if (data != null) {
-                                try {
-                                    @SuppressWarnings("unchecked")
-                                    final Map<String, Object> responseDataMap =
-                                            (Map<String, Object>) SerializeUtil.fromBinaryToObject(data);
-                                    dataMap.putAll(responseDataMap);
-                                } catch (final Exception e) {
-                                    throw new CrawlerSystemException("Could not create an instance from bytes.", e);
-                                }
-                            }
-
-                            // remove
-                            for (final String fieldName : ignoreFieldNames) {
-                                dataMap.remove(fieldName);
-                            }
-
-                            return indexUpdateCallback.store(paramMap, dataMap);
-                        } else {
-                            logger.warn("The response processor is not DefaultResponseProcessor. responseProcessor: " + responseProcessor
-                                    + ", Data: " + dataMap);
-                            return false;
-                        }
-                    }
-                } catch (final Exception e) {
-                    throw new DataStoreCrawlingException(url, "Failed to add: " + dataMap, e);
-                }
-            }
-        }
-
-        protected boolean deleteDocument(final Map<String, String> paramMap, final Map<String, Object> dataMap) {
-
-            if (logger.isDebugEnabled()) {
-                logger.debug("Deleting " + dataMap);
-            }
-
-            final FessConfig fessConfig = ComponentUtil.getFessConfig();
-
-            // required check
-            if (!dataMap.containsKey(fessConfig.getIndexFieldUrl()) || dataMap.get(fessConfig.getIndexFieldUrl()) == null) {
-                logger.warn("Could not delete a doc. Invalid data: " + dataMap);
-                return false;
-            }
-
-            synchronized (indexUpdateCallback) {
-                deleteIdList.add(crawlingInfoHelper.generateId(dataMap));
-
-                if (deleteIdList.size() >= maxDeleteDocumentCacheSize) {
-                    final FessEsClient fessEsClient = ComponentUtil.getElasticsearchClient();
-                    final IndexingHelper indexingHelper = ComponentUtil.getIndexingHelper();
-                    for (final String id : deleteIdList) {
-                        indexingHelper.deleteDocument(fessEsClient, id);
-                    }
-                    if (logger.isDebugEnabled()) {
-                        logger.debug("Deleted " + deleteIdList);
-                    }
-                    deleteIdList.clear();
-                }
-
-            }
-            return true;
-        }
-
-        @Override
-        public void commit() {
-            if (!deleteIdList.isEmpty()) {
-                final FessEsClient fessEsClient = ComponentUtil.getElasticsearchClient();
-                final IndexingHelper indexingHelper = ComponentUtil.getIndexingHelper();
-                for (final String id : deleteIdList) {
-                    indexingHelper.deleteDocument(fessEsClient, id);
-                }
-                if (logger.isDebugEnabled()) {
-                    logger.debug("Deleted " + deleteIdList);
-                }
-            }
-            indexUpdateCallback.commit();
-        }
-
-        @Override
-        public long getDocumentSize() {
-            return indexUpdateCallback.getDocumentSize();
-        }
-
-        @Override
-        public long getExecuteTime() {
-            return indexUpdateCallback.getExecuteTime();
-        }
-
-    }
-}

+ 49 - 23
src/main/java/org/codelibs/fess/ds/impl/FileListIndexUpdateCallbackImpl.java

@@ -18,6 +18,10 @@ package org.codelibs.fess.ds.impl;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.codelibs.core.io.SerializeUtil;
 import org.codelibs.fess.Constants;
@@ -32,9 +36,9 @@ import org.codelibs.fess.crawler.processor.impl.DefaultResponseProcessor;
 import org.codelibs.fess.crawler.rule.Rule;
 import org.codelibs.fess.crawler.rule.RuleManager;
 import org.codelibs.fess.crawler.transformer.Transformer;
-import org.codelibs.fess.ds.DataStoreCrawlingException;
 import org.codelibs.fess.ds.IndexUpdateCallback;
 import org.codelibs.fess.es.client.FessEsClient;
+import org.codelibs.fess.exception.DataStoreCrawlingException;
 import org.codelibs.fess.helper.IndexingHelper;
 import org.codelibs.fess.mylasta.direction.FessConfig;
 import org.codelibs.fess.util.ComponentUtil;
@@ -43,7 +47,7 @@ import org.lastaflute.di.core.SingletonLaContainer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class FileListIndexUpdateCallbackImpl implements IndexUpdateCallback {
+public class FileListIndexUpdateCallbackImpl implements IndexUpdateCallback, AutoCloseable {
     private static final Logger logger = LoggerFactory.getLogger(FileListIndexUpdateCallbackImpl.class);
 
     protected IndexUpdateCallback indexUpdateCallback;
@@ -54,40 +58,51 @@ public class FileListIndexUpdateCallbackImpl implements IndexUpdateCallback {
 
     protected int maxDeleteDocumentCacheSize = 100;
 
-    protected FileListIndexUpdateCallbackImpl(final IndexUpdateCallback indexUpdateCallback, final CrawlerClientFactory crawlerClientFactory) {
+    private ExecutorService executor;
+
+    protected FileListIndexUpdateCallbackImpl(final IndexUpdateCallback indexUpdateCallback,
+            final CrawlerClientFactory crawlerClientFactory, final int nThreads) {
         this.indexUpdateCallback = indexUpdateCallback;
         this.crawlerClientFactory = crawlerClientFactory;
+        executor = newFixedThreadPool(nThreads < 1 ? 1 : nThreads);
     }
 
-    @Override
-    public boolean store(final Map<String, String> paramMap, final Map<String, Object> dataMap) {
-        final Object eventType = dataMap.remove(getParamValue(paramMap, "field.event_type", "event_type"));
-
-        if (getParamValue(paramMap, "event.create", "create").equals(eventType)
-                || getParamValue(paramMap, "event.modify", "modify").equals(eventType)) {
-            // updated file
-            return addDocument(paramMap, dataMap);
-        } else if (getParamValue(paramMap, "event.delete", "delete").equals(eventType)) {
-            // deleted file
-            return deleteDocument(paramMap, dataMap);
+    protected ExecutorService newFixedThreadPool(int nThreads) {
+        if (logger.isDebugEnabled()) {
+            logger.debug("Executor Thread Pool: " + nThreads);
         }
+        return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(nThreads),
+                new ThreadPoolExecutor.CallerRunsPolicy());
+    }
 
-        logger.warn("unknown event: " + eventType + ", data: " + dataMap);
-        // don't stop crawling
-        return true;
+    @Override
+    public void store(final Map<String, String> paramMap, final Map<String, Object> dataMap) {
+        executor.execute(() -> {
+            final Object eventType = dataMap.remove(getParamValue(paramMap, "field.event_type", "event_type"));
+            if (getParamValue(paramMap, "event.create", "create").equals(eventType)
+                    || getParamValue(paramMap, "event.modify", "modify").equals(eventType)) {
+                // updated file
+                addDocument(paramMap, dataMap);
+            } else if (getParamValue(paramMap, "event.delete", "delete").equals(eventType)) {
+                // deleted file
+                deleteDocument(paramMap, dataMap);
+            } else {
+                logger.warn("unknown event: " + eventType + ", data: " + dataMap);
+            }
+        });
     }
 
     protected String getParamValue(Map<String, String> paramMap, String key, String defaultValue) {
         return paramMap.getOrDefault(key, defaultValue);
     }
 
-    protected boolean addDocument(final Map<String, String> paramMap, final Map<String, Object> dataMap) {
+    protected void addDocument(final Map<String, String> paramMap, final Map<String, Object> dataMap) {
         final FessConfig fessConfig = ComponentUtil.getFessConfig();
         synchronized (indexUpdateCallback) {
             // required check
             if (!dataMap.containsKey(fessConfig.getIndexFieldUrl()) || dataMap.get(fessConfig.getIndexFieldUrl()) == null) {
                 logger.warn("Could not add a doc. Invalid data: " + dataMap);
-                return false;
+                return;
             }
 
             final String url = dataMap.get(fessConfig.getIndexFieldUrl()).toString();
@@ -95,7 +110,7 @@ public class FileListIndexUpdateCallbackImpl implements IndexUpdateCallback {
                 final CrawlerClient client = crawlerClientFactory.getClient(url);
                 if (client == null) {
                     logger.warn("CrawlerClient is null. Data: " + dataMap);
-                    return false;
+                    return;
                 }
 
                 final long startTime = System.currentTimeMillis();
@@ -111,7 +126,6 @@ public class FileListIndexUpdateCallbackImpl implements IndexUpdateCallback {
                 final Rule rule = ruleManager.getRule(responseData);
                 if (rule == null) {
                     logger.warn("No url rule. Data: " + dataMap);
-                    return false;
                 } else {
                     responseData.setRuleId(rule.getRuleId());
                     final ResponseProcessor responseProcessor = rule.getResponseProcessor();
@@ -138,11 +152,10 @@ public class FileListIndexUpdateCallbackImpl implements IndexUpdateCallback {
                         }
                         StreamUtil.of(ignoreFields).map(s -> s.trim()).forEach(s -> dataMap.remove(s));
 
-                        return indexUpdateCallback.store(paramMap, dataMap);
+                        indexUpdateCallback.store(paramMap, dataMap);
                     } else {
                         logger.warn("The response processor is not DefaultResponseProcessor. responseProcessor: " + responseProcessor
                                 + ", Data: " + dataMap);
-                        return false;
                     }
                 }
             } catch (final Exception e) {
@@ -212,4 +225,17 @@ public class FileListIndexUpdateCallbackImpl implements IndexUpdateCallback {
     public void setMaxDeleteDocumentCacheSize(int maxDeleteDocumentCacheSize) {
         this.maxDeleteDocumentCacheSize = maxDeleteDocumentCacheSize;
     }
+
+    @Override
+    public void close() throws Exception {
+        try {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Shutting down thread executor.");
+            }
+            executor.shutdown();
+            executor.awaitTermination(60, TimeUnit.SECONDS);
+        } finally {
+            executor.shutdownNow();
+        }
+    }
 }

+ 3 - 4
src/main/java/org/codelibs/fess/ds/impl/IndexUpdateCallbackImpl.java

@@ -22,7 +22,7 @@ import javax.annotation.PostConstruct;
 
 import org.codelibs.fess.ds.IndexUpdateCallback;
 import org.codelibs.fess.es.client.FessEsClient;
-import org.codelibs.fess.exception.FessSystemException;
+import org.codelibs.fess.exception.DataStoreException;
 import org.codelibs.fess.helper.CrawlingInfoHelper;
 import org.codelibs.fess.helper.IndexingHelper;
 import org.codelibs.fess.helper.SearchLogHelper;
@@ -54,7 +54,7 @@ public class IndexUpdateCallbackImpl implements IndexUpdateCallback {
      * @see org.codelibs.fess.ds.impl.IndexUpdateCallback#store(java.util.Map)
      */
     @Override
-    public boolean store(final Map<String, String> paramMap, final Map<String, Object> dataMap) {
+    public void store(final Map<String, String> paramMap, final Map<String, Object> dataMap) {
         final long startTime = System.currentTimeMillis();
         final FessConfig fessConfig = ComponentUtil.getFessConfig();
         final FessEsClient fessEsClient = ComponentUtil.getElasticsearchClient();
@@ -66,7 +66,7 @@ public class IndexUpdateCallbackImpl implements IndexUpdateCallback {
         //   required check
         final Object urlObj = dataMap.get(fessConfig.getIndexFieldUrl());
         if (urlObj == null) {
-            throw new FessSystemException("url is null. dataMap=" + dataMap);
+            throw new DataStoreException("url is null. dataMap=" + dataMap);
         }
 
         final IndexingHelper indexingHelper = ComponentUtil.getIndexingHelper();
@@ -112,7 +112,6 @@ public class IndexUpdateCallbackImpl implements IndexUpdateCallback {
             logger.debug("The number of an added document is " + documentSize.get() + ".");
         }
 
-        return true;
     }
 
     @Override

+ 11 - 1
src/main/java/org/codelibs/fess/ds/DataStoreCrawlingException.java → src/main/java/org/codelibs/fess/exception/DataStoreCrawlingException.java

@@ -13,7 +13,7 @@
  * either express or implied. See the License for the specific language
  * governing permissions and limitations under the License.
  */
-package org.codelibs.fess.ds;
+package org.codelibs.fess.exception;
 
 import org.codelibs.fess.crawler.exception.CrawlingAccessException;
 
@@ -23,13 +23,23 @@ public class DataStoreCrawlingException extends CrawlingAccessException {
 
     private final String url;
 
+    private boolean abort;
+
     public DataStoreCrawlingException(final String url, final String message, final Exception e) {
+        this(url, message, e, false);
+    }
+
+    public DataStoreCrawlingException(final String url, final String message, final Exception e, final boolean abort) {
         super(message, e);
         this.url = url;
+        this.abort = abort;
     }
 
     public String getUrl() {
         return url;
     }
 
+    public boolean aborted() {
+        return abort;
+    }
 }

+ 1 - 3
src/main/java/org/codelibs/fess/ds/DataStoreException.java → src/main/java/org/codelibs/fess/exception/DataStoreException.java

@@ -13,9 +13,7 @@
  * either express or implied. See the License for the specific language
  * governing permissions and limitations under the License.
  */
-package org.codelibs.fess.ds;
-
-import org.codelibs.fess.exception.FessSystemException;
+package org.codelibs.fess.exception;
 
 public class DataStoreException extends FessSystemException {
     private static final long serialVersionUID = 1L;