fix #656 and #655 delete docs by url and wait for executor shutdown

This commit is contained in:
Shinsuke Sugaya 2016-08-28 23:02:46 +09:00
parent 6d5098c45f
commit 59f45a7ecb
4 changed files with 51 additions and 38 deletions

View file

@ -62,8 +62,9 @@ public class CsvListDataStoreImpl extends CsvDataStoreImpl {
}
final CrawlerClientFactory crawlerClientFactory = ComponentUtil.getCrawlerClientFactory();
dataConfig.initializeClientFactory(crawlerClientFactory);
try (final FileListIndexUpdateCallbackImpl fileListIndexUpdateCallback =
new FileListIndexUpdateCallbackImpl(callback, crawlerClientFactory, nThreads)) {
try {
final FileListIndexUpdateCallbackImpl fileListIndexUpdateCallback =
new FileListIndexUpdateCallbackImpl(callback, crawlerClientFactory, nThreads);
super.storeData(dataConfig, fileListIndexUpdateCallback, paramMap, scriptMap, defaultDataMap);
fileListIndexUpdateCallback.commit();
} catch (final Exception e) {

View file

@ -42,8 +42,9 @@ public class EsListDataStoreImpl extends EsDataStoreImpl {
}
final CrawlerClientFactory crawlerClientFactory = ComponentUtil.getCrawlerClientFactory();
dataConfig.initializeClientFactory(crawlerClientFactory);
try (final FileListIndexUpdateCallbackImpl fileListIndexUpdateCallback =
new FileListIndexUpdateCallbackImpl(callback, crawlerClientFactory, nThreads)) {
try {
final FileListIndexUpdateCallbackImpl fileListIndexUpdateCallback =
new FileListIndexUpdateCallbackImpl(callback, crawlerClientFactory, nThreads);
super.storeData(dataConfig, fileListIndexUpdateCallback, paramMap, scriptMap, defaultDataMap);
fileListIndexUpdateCallback.commit();
} catch (final Exception e) {

View file

@ -50,14 +50,14 @@ import org.lastaflute.di.core.SingletonLaContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FileListIndexUpdateCallbackImpl implements IndexUpdateCallback, AutoCloseable {
public class FileListIndexUpdateCallbackImpl implements IndexUpdateCallback {
private static final Logger logger = LoggerFactory.getLogger(FileListIndexUpdateCallbackImpl.class);
protected IndexUpdateCallback indexUpdateCallback;
protected CrawlerClientFactory crawlerClientFactory;
protected List<String> deleteIdList = new ArrayList<>(100);
protected List<String> deleteUrlList = new ArrayList<>(100);
protected int maxDeleteDocumentCacheSize = 100;
@ -65,6 +65,8 @@ public class FileListIndexUpdateCallbackImpl implements IndexUpdateCallback, Aut
private final ExecutorService executor;
private int executorTerminationTimeout = 300;
protected FileListIndexUpdateCallbackImpl(final IndexUpdateCallback indexUpdateCallback,
final CrawlerClientFactory crawlerClientFactory, final int nThreads) {
this.indexUpdateCallback = indexUpdateCallback;
@ -202,18 +204,10 @@ public class FileListIndexUpdateCallbackImpl implements IndexUpdateCallback, Aut
}
synchronized (indexUpdateCallback) {
deleteIdList.add(ComponentUtil.getCrawlingInfoHelper().generateId(dataMap));
deleteUrlList.add(dataMap.get(fessConfig.getIndexFieldUrl()).toString());
if (deleteIdList.size() >= maxDeleteDocumentCacheSize) {
final FessEsClient fessEsClient = ComponentUtil.getFessEsClient();
final IndexingHelper indexingHelper = ComponentUtil.getIndexingHelper();
for (final String id : deleteIdList) {
indexingHelper.deleteDocument(fessEsClient, id);
}
if (logger.isDebugEnabled()) {
logger.debug("Deleted " + deleteIdList);
}
deleteIdList.clear();
if (deleteUrlList.size() >= maxDeleteDocumentCacheSize) {
deleteDocuments();
}
}
@ -222,19 +216,38 @@ public class FileListIndexUpdateCallbackImpl implements IndexUpdateCallback, Aut
@Override
public void commit() {
if (!deleteIdList.isEmpty()) {
final FessEsClient fessEsClient = ComponentUtil.getFessEsClient();
final IndexingHelper indexingHelper = ComponentUtil.getIndexingHelper();
for (final String id : deleteIdList) {
indexingHelper.deleteDocument(fessEsClient, id);
}
try {
if (logger.isDebugEnabled()) {
logger.debug("Deleted " + deleteIdList);
logger.debug("Shutting down thread executor.");
}
executor.shutdown();
executor.awaitTermination(executorTerminationTimeout, TimeUnit.SECONDS);
} catch (InterruptedException e) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to interrupt executor.", e);
}
} finally {
executor.shutdownNow();
}
if (!deleteUrlList.isEmpty()) {
deleteDocuments();
}
indexUpdateCallback.commit();
}
protected void deleteDocuments() {
final FessEsClient fessEsClient = ComponentUtil.getFessEsClient();
final IndexingHelper indexingHelper = ComponentUtil.getIndexingHelper();
for (final String url : deleteUrlList) {
indexingHelper.deleteDocumentByUrl(fessEsClient, url);
}
if (logger.isDebugEnabled()) {
logger.debug("Deleted " + deleteUrlList);
}
deleteUrlList.clear();
}
@Override
public long getDocumentSize() {
return indexUpdateCallback.getDocumentSize();
@ -253,16 +266,8 @@ public class FileListIndexUpdateCallbackImpl implements IndexUpdateCallback, Aut
this.maxRedirectCount = maxRedirectCount;
}
@Override
public void close() throws Exception {
try {
if (logger.isDebugEnabled()) {
logger.debug("Shutting down thread executor.");
}
executor.shutdown();
executor.awaitTermination(60, TimeUnit.SECONDS);
} finally {
executor.shutdownNow();
}
public void setExecutorTerminationTimeout(int executorTerminationTimeout) {
this.executorTerminationTimeout = executorTerminationTimeout;
}
}

View file

@ -116,10 +116,16 @@ public class IndexingHelper {
fessEsClient.delete(fessConfig.getIndexDocumentUpdateIndex(), fessConfig.getIndexDocumentType(), id, 0);
}
public void deleteDocumentsByDocId(final FessEsClient fessEsClient, final List<String> docIdList) {
public int deleteDocumentByUrl(final FessEsClient fessEsClient, final String url) {
final FessConfig fessConfig = ComponentUtil.getFessConfig();
fessEsClient.deleteByQuery(fessConfig.getIndexDocumentUpdateIndex(), fessConfig.getIndexDocumentType(),
QueryBuilders.idsQuery(fessConfig.getIndexDocumentType()).ids(docIdList.stream().toArray(n -> new String[n])));
return fessEsClient.deleteByQuery(fessConfig.getIndexDocumentUpdateIndex(), fessConfig.getIndexDocumentType(),
QueryBuilders.termQuery(fessConfig.getIndexFieldUrl(), url));
}
public int deleteDocumentsByDocId(final FessEsClient fessEsClient, final List<String> docIdList) {
final FessConfig fessConfig = ComponentUtil.getFessConfig();
return fessEsClient.deleteByQuery(fessConfig.getIndexDocumentUpdateIndex(), fessConfig.getIndexDocumentType(), QueryBuilders
.idsQuery(fessConfig.getIndexDocumentType()).ids(docIdList.stream().toArray(n -> new String[n])));
}
public Map<String, Object> getDocument(final FessEsClient fessEsClient, final String id, final String[] fields) {