#2640 multithread support

This commit is contained in:
Shinsuke Sugaya 2022-04-14 12:21:35 +09:00
parent 0bc1848738
commit 072331b927
3 changed files with 168 additions and 28 deletions

View file

@ -94,17 +94,29 @@ public class FileListIndexUpdateCallbackImpl implements IndexUpdateCallback {
@Override
public void store(final DataStoreParams paramMap, final Map<String, Object> dataMap) {
final CrawlerStatsHelper crawlerStatsHelper = ComponentUtil.getCrawlerStatsHelper();
final StatsKeyObject keyObj = paramMap.get(Constants.CRAWLER_STATS_KEY) instanceof StatsKeyObject sko ? sko : null;
if (keyObj != null) {
crawlerStatsHelper.runOnThread(keyObj);
}
final DataStoreParams localParams = paramMap.newInstance();
executor.execute(() -> {
final Object eventType = dataMap.remove(getParamValue(paramMap, "field.event_type", "event_type"));
if (getParamValue(paramMap, "event.create", "create").equals(eventType)
|| getParamValue(paramMap, "event.modify", "modify").equals(eventType)) {
// updated file
addDocument(paramMap, dataMap);
} else if (getParamValue(paramMap, "event.delete", "delete").equals(eventType)) {
// deleted file
deleteDocument(paramMap, dataMap);
} else {
logger.warn("unknown event: {}, data: {}", eventType, dataMap);
try {
final Object eventType = dataMap.remove(getParamValue(localParams, "field.event_type", "event_type"));
if (getParamValue(localParams, "event.create", "create").equals(eventType)
|| getParamValue(localParams, "event.modify", "modify").equals(eventType)) {
// updated file
addDocument(localParams, dataMap);
} else if (getParamValue(localParams, "event.delete", "delete").equals(eventType)) {
// deleted file
deleteDocument(localParams, dataMap);
} else {
logger.warn("unknown event: {}, data: {}", eventType, dataMap);
}
} finally {
if (keyObj != null) {
crawlerStatsHelper.done(keyObj);
}
}
});
}

View file

@ -18,10 +18,11 @@ package org.codelibs.fess.helper;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -38,9 +39,11 @@ import com.google.common.cache.LoadingCache;
*
*/
public class CrawlerStatsHelper {
private static final Logger logger = LogManager.getLogger(CrawlerStatsHelper.class);
private static final String BEGIN_KEY = "begin";
protected Logger logger = null;
protected Logger statsLogger = null;
protected String loggerName = "fess.log.crawler.stats";
@ -48,22 +51,35 @@ public class CrawlerStatsHelper {
protected long cacheExpireAfterWrite = 10 * 60 * 1000L;
protected LoadingCache<String, Map<String, Long>> statsCache;
protected LoadingCache<String, StatsObject> statsCache;
@PostConstruct
public void init() {
logger = LogManager.getLogger(loggerName);
statsLogger = LogManager.getLogger(loggerName);
statsCache = CacheBuilder.newBuilder().maximumSize(maxCacheSize).expireAfterWrite(cacheExpireAfterWrite, TimeUnit.MILLISECONDS)
.build(new CacheLoader<String, Map<String, Long>>() {
.build(new CacheLoader<String, StatsObject>() {
@Override
public Map<String, Long> load(String key) {
Map<String, Long> map = new LinkedHashMap<>();
map.put(BEGIN_KEY, System.currentTimeMillis());
return map;
public StatsObject load(String key) {
return new StatsObject();
}
});
}
@PreDestroy
public void destroy() {
if (logger.isDebugEnabled()) {
logger.debug("cache stats: {}", statsCache.stats());
}
statsCache.asMap().entrySet().stream().forEach(e -> {
final StatsObject data = e.getValue();
final Long begin = data.remove(BEGIN_KEY);
if (begin != null) {
printStats(e.getKey(), data, begin.longValue(), false);
}
});
;
}
public void begin(final Object keyObj) {
getCacheKey(keyObj).ifPresent(key -> {
try {
@ -84,7 +100,7 @@ public class CrawlerStatsHelper {
public void record(final Object keyObj, final String action) {
getCacheKey(keyObj).ifPresent(key -> {
try {
Map<String, Long> data = statsCache.getIfPresent(key);
final StatsObject data = statsCache.getIfPresent(key);
if (data != null) {
data.put(escapeValue(action), System.currentTimeMillis());
}
@ -100,16 +116,12 @@ public class CrawlerStatsHelper {
public void done(final Object keyObj) {
getCacheKey(keyObj).ifPresent(key -> {
try {
final Map<String, Long> data = statsCache.getIfPresent(key);
if (data != null) {
final StatsObject data = statsCache.getIfPresent(key);
if (data != null && data.decrement() <= 0) {
statsCache.invalidate(key);
final Long begin = data.remove(BEGIN_KEY);
if (begin != null) {
final StringBuilder buf = createStringBuffer(keyObj, begin.longValue());
buf.append('\t').append("done:").append(System.currentTimeMillis() - begin.longValue());
data.entrySet().stream().map(e -> escapeValue(e.getKey()) + ":" + (e.getValue().longValue() - begin.longValue()))
.map(s -> "\t" + s).forEach(s -> buf.append(s));
log(buf);
printStats(keyObj, data, begin.longValue(), true);
}
}
} catch (Exception e) {
@ -121,6 +133,48 @@ public class CrawlerStatsHelper {
});
}
public void discard(final Object keyObj) {
getCacheKey(keyObj).ifPresent(key -> {
try {
final StatsObject data = statsCache.getIfPresent(key);
if (data != null) {
statsCache.invalidate(key);
}
} catch (Exception e) {
final StringBuilder buf = createStringBuffer(keyObj, System.currentTimeMillis());
buf.append('\t').append("action:done");
buf.append('\t').append("error:").append(escapeValue(e.getLocalizedMessage()).replaceAll("\\s", " "));
log(buf);
}
});
}
protected void printStats(final Object keyObj, final StatsObject data, final long begin, final boolean done) {
final StringBuilder buf = createStringBuffer(keyObj, begin);
if (done) {
buf.append('\t').append("done:").append(System.currentTimeMillis() - begin);
}
data.entrySet().stream().map(e -> escapeValue(e.getKey()) + ":" + (e.getValue().longValue() - begin)).map(s -> "\t" + s)
.forEach(s -> buf.append(s));
log(buf);
}
public void runOnThread(final Object keyObj) {
getCacheKey(keyObj).ifPresent(key -> {
try {
final StatsObject data = statsCache.getIfPresent(key);
if (data != null) {
data.increment();
}
} catch (Exception e) {
final StringBuilder buf = createStringBuffer(keyObj, System.currentTimeMillis());
buf.append('\t').append("action:record");
buf.append('\t').append("error:").append(escapeValue(e.getLocalizedMessage()).replaceAll("\\s", " "));
log(buf);
}
});
}
private StringBuilder createStringBuffer(final Object keyObj, final long time) {
final StringBuilder buf = new StringBuilder(1000);
buf.append("url:").append(getUrl(keyObj));
@ -160,7 +214,7 @@ public class CrawlerStatsHelper {
}
protected void log(final StringBuilder buf) {
logger.info(buf.toString());
statsLogger.info(buf.toString());
}
public void setLoggerName(String loggerName) {
@ -201,6 +255,25 @@ public class CrawlerStatsHelper {
}
}
public static class StatsObject extends LinkedHashMap<String, Long> {
private static final long serialVersionUID = 1L;
protected final AtomicInteger count;
public StatsObject() {
put(BEGIN_KEY, System.currentTimeMillis());
count = new AtomicInteger(1);
}
public int increment() {
return count.incrementAndGet();
}
public int decrement() {
return count.decrementAndGet();
}
}
public enum StatsAction {
ACCESSED, //
ACCESS_EXCEPTION, //

View file

@ -112,4 +112,59 @@ public class CrawlerStatsHelperTest extends UnitFessTestCase {
crawlerStatsHelper.done(key);
assertNull(localLogMsg.get());
}
public void test_beginDoneWithRecordOnLazy() {
String key = "test";
crawlerStatsHelper.begin(key);
crawlerStatsHelper.record(key, "aaa");
crawlerStatsHelper.runOnThread(key);
crawlerStatsHelper.done(key);
logger.info(localLogMsg.get());
assertNull(localLogMsg.get());
crawlerStatsHelper.done(key);
logger.info(localLogMsg.get());
String[] values = localLogMsg.get().split("\t");
assertEquals(4, values.length);
assertEquals("url:test", values[0]);
assertTrue(values[1].startsWith("time:"));
assertTrue(values[2].startsWith("done:"));
assertTrue(values[3].startsWith("aaa:"));
localLogMsg.remove();
crawlerStatsHelper.done(key);
assertNull(localLogMsg.get());
}
public void test_beginWithRecordAndDiscard() {
String key = "test";
crawlerStatsHelper.begin(key);
crawlerStatsHelper.record(key, "aaa");
crawlerStatsHelper.discard(key);
logger.info(localLogMsg.get());
assertNull(localLogMsg.get());
crawlerStatsHelper.done(key);
logger.info(localLogMsg.get());
assertNull(localLogMsg.get());
localLogMsg.remove();
}
public void test_beginWithRecordOnDestroy() {
String key = "test";
crawlerStatsHelper.begin(key);
crawlerStatsHelper.record(key, "aaa");
logger.info(localLogMsg.get());
assertNull(localLogMsg.get());
crawlerStatsHelper.destroy();
logger.info(localLogMsg.get());
String[] values = localLogMsg.get().split("\t");
assertEquals(3, values.length);
assertEquals("url:test", values[0]);
assertTrue(values[1].startsWith("time:"));
assertTrue(values[2].startsWith("aaa:"));
localLogMsg.remove();
}
}