fix #2862 Optimize SearchLogHelper for direct queue handling and streamlined batch processing.

This commit is contained in:
Shinsuke Sugaya 2024-12-27 22:58:21 +09:00
parent 80e9cb0ddf
commit cfa492eb8e

View file

@ -74,9 +74,9 @@ public class SearchLogHelper {
protected int userInfoCacheSize = 10000;
protected volatile Queue<SearchLog> searchLogQueue = new ConcurrentLinkedQueue<>();
protected Queue<SearchLog> searchLogQueue = new ConcurrentLinkedQueue<>();
protected volatile Queue<ClickLog> clickLogQueue = new ConcurrentLinkedQueue<>();
protected Queue<ClickLog> clickLogQueue = new ConcurrentLinkedQueue<>();
protected LoadingCache<String, UserInfo> userInfoCache;
@ -203,16 +203,19 @@ public class SearchLogHelper {
}
public void storeSearchLog() {
if (!searchLogQueue.isEmpty()) {
final Queue<SearchLog> queue = searchLogQueue;
searchLogQueue = new ConcurrentLinkedQueue<>();
processSearchLogQueue(queue);
}
storeSearchLogFromQueue();
storeClickLogFromQueue();
}
protected void storeClickLogFromQueue() {
if (!clickLogQueue.isEmpty()) {
final Queue<ClickLog> queue = clickLogQueue;
clickLogQueue = new ConcurrentLinkedQueue<>();
processClickLogQueue(queue);
processClickLogQueue(clickLogQueue);
}
}
protected void storeSearchLogFromQueue() {
if (!searchLogQueue.isEmpty()) {
processSearchLogQueue(searchLogQueue);
}
}
@ -271,27 +274,40 @@ public class SearchLogHelper {
botNames = value.split(",");
}
final int batchSize = ComponentUtil.getFessConfig().getSearchlogProcessBatchSizeAsInteger();
final List<SearchLog> searchLogList = new ArrayList<>();
final Map<String, UserInfo> userInfoMap = new HashMap<>();
queue.stream().forEach(searchLog -> {
final String userAgent = searchLog.getUserAgent();
final boolean isBot =
userAgent != null && stream(botNames).get(stream -> stream.anyMatch(botName -> userAgent.indexOf(botName) >= 0));
if (!isBot) {
searchLog.getUserInfo().ifPresent(userInfo -> {
final String code = userInfo.getId();
final UserInfo oldUserInfo = userInfoMap.get(code);
if (oldUserInfo != null) {
userInfo.setCreatedAt(oldUserInfo.getCreatedAt());
}
userInfoMap.put(code, userInfo);
});
searchLogList.add(searchLog);
while (!queue.isEmpty()) {
final SearchLog searchLog = queue.poll();
if (searchLog != null) {
final String userAgent = searchLog.getUserAgent();
final boolean isBot =
userAgent != null && stream(botNames).get(stream -> stream.anyMatch(botName -> userAgent.indexOf(botName) >= 0));
if (!isBot) {
searchLog.getUserInfo().ifPresent(userInfo -> {
final String code = userInfo.getId();
final UserInfo oldUserInfo = userInfoMap.get(code);
if (oldUserInfo != null) {
userInfo.setCreatedAt(oldUserInfo.getCreatedAt());
}
userInfoMap.put(code, userInfo);
});
searchLogList.add(searchLog);
}
}
});
if (searchLogList.size() >= batchSize) {
processUserInfoLog(searchLogList, userInfoMap);
processSearchLog(searchLogList);
searchLogList.clear();
userInfoMap.clear();
}
}
processUserInfoLog(searchLogList, userInfoMap);
processSearchLog(searchLogList);
if (!searchLogList.isEmpty()) {
processUserInfoLog(searchLogList, userInfoMap);
processSearchLog(searchLogList);
}
}
private void processSearchLog(final List<SearchLog> searchLogList) {
@ -341,47 +357,52 @@ public class SearchLogHelper {
protected void storeSearchLogList(final List<SearchLog> searchLogList) {
final SearchLogBhv searchLogBhv = ComponentUtil.getComponent(SearchLogBhv.class);
final int batchSize = ComponentUtil.getFessConfig().getSearchlogProcessBatchSizeAsInteger();
final int totalSize = searchLogList.size();
for (int i = 0; i < totalSize; i += batchSize) {
final int end = Math.min(totalSize, i + batchSize);
if (logger.isDebugEnabled()) {
logger.debug("Sending {} search logs. ({}-{}/{})", end - i, i, end, totalSize);
}
searchLogBhv.batchUpdate(searchLogList.subList(i, end), op -> {
op.setRefreshPolicy(Constants.TRUE);
});
}
searchLogBhv.batchUpdate(searchLogList, op -> {
op.setRefreshPolicy(Constants.TRUE);
});
}
protected void processClickLogQueue(final Queue<ClickLog> queue) {
final FessConfig fessConfig = ComponentUtil.getFessConfig();
final int batchSize = fessConfig.getSearchlogProcessBatchSizeAsInteger();
final Map<String, Integer> clickCountMap = new HashMap<>();
final List<ClickLog> clickLogList = new ArrayList<>();
for (final ClickLog clickLog : queue) {
try {
final SearchLogBhv searchLogBhv = ComponentUtil.getComponent(SearchLogBhv.class);
searchLogBhv.selectEntity(cb -> {
cb.query().setQueryId_Equal(clickLog.getQueryId());
}).ifPresent(entity -> {
clickLogList.add(clickLog);
final String docId = clickLog.getDocId();
Integer countObj = clickCountMap.get(docId);
if (countObj == null) {
countObj = 1;
} else {
countObj = countObj.intValue() + 1;
}
clickCountMap.put(docId, countObj);
}).orElse(() -> {
logger.warn("Not Found for SearchLog: {}", clickLog);
});
} catch (final Exception e) {
logger.warn("Failed to process: {}", clickLog, e);
while (!queue.isEmpty()) {
final ClickLog clickLog = queue.poll();
if (clickLog != null) {
try {
final SearchLogBhv searchLogBhv = ComponentUtil.getComponent(SearchLogBhv.class);
searchLogBhv.selectEntity(cb -> {
cb.query().setQueryId_Equal(clickLog.getQueryId());
}).ifPresent(entity -> {
clickLogList.add(clickLog);
final String docId = clickLog.getDocId();
Integer countObj = clickCountMap.get(docId);
if (countObj == null) {
countObj = 1;
} else {
countObj = countObj.intValue() + 1;
}
clickCountMap.put(docId, countObj);
}).orElse(() -> {
logger.warn("Not Found for SearchLog: {}", clickLog);
});
} catch (final Exception e) {
logger.warn("Failed to process: {}", clickLog, e);
}
}
if (clickLogList.size() >= batchSize) {
processClickLog(clickLogList);
updateClickFieldInIndex(clickCountMap);
clickLogList.clear();
clickCountMap.clear();
}
}
processClickLog(clickLogList);
updateClickFieldInIndex(clickCountMap);
if (!clickLogList.isEmpty()) {
processClickLog(clickLogList);
updateClickFieldInIndex(clickCountMap);
}
}
protected void updateClickFieldInIndex(final Map<String, Integer> clickCountMap) {
@ -424,15 +445,7 @@ public class SearchLogHelper {
final FessConfig fessConfig = ComponentUtil.getFessConfig();
try {
final ClickLogBhv clickLogBhv = ComponentUtil.getComponent(ClickLogBhv.class);
final int batchSize = fessConfig.getSearchlogProcessBatchSizeAsInteger();
final int totalSize = clickLogList.size();
for (int i = 0; i < totalSize; i += batchSize) {
final int end = Math.min(totalSize, i + batchSize);
if (logger.isDebugEnabled()) {
logger.debug("Sending {} click logs. ({}-{}/{})", end - i, i, end, totalSize);
}
clickLogBhv.batchInsert(clickLogList.subList(i, end));
}
clickLogBhv.batchInsert(clickLogList);
} catch (final Exception e) {
logger.warn("Failed to insert: {}", clickLogList, e);
}