fix #2827 Add CPU load calibration during search log download to prevent high CPU usage.
This commit is contained in:
parent
5d74bd3c3b
commit
4a010e703d
4 changed files with 171 additions and 80 deletions
|
@ -63,12 +63,18 @@ import org.codelibs.fess.es.log.exbhv.ClickLogBhv;
|
|||
import org.codelibs.fess.es.log.exbhv.FavoriteLogBhv;
|
||||
import org.codelibs.fess.es.log.exbhv.SearchLogBhv;
|
||||
import org.codelibs.fess.es.log.exbhv.UserInfoBhv;
|
||||
import org.codelibs.fess.es.log.exentity.ClickLog;
|
||||
import org.codelibs.fess.es.log.exentity.FavoriteLog;
|
||||
import org.codelibs.fess.es.log.exentity.SearchLog;
|
||||
import org.codelibs.fess.es.log.exentity.UserInfo;
|
||||
import org.codelibs.fess.helper.SystemHelper;
|
||||
import org.codelibs.fess.mylasta.direction.FessConfig;
|
||||
import org.codelibs.fess.util.ComponentUtil;
|
||||
import org.codelibs.fess.util.GsaConfigParser;
|
||||
import org.codelibs.fess.util.RenderDataUtil;
|
||||
import org.codelibs.fess.util.ResourceUtil;
|
||||
import org.codelibs.fess.util.SearchEngineUtil;
|
||||
import org.dbflute.bhv.readable.EntityRowHandler;
|
||||
import org.lastaflute.core.magic.async.AsyncManager;
|
||||
import org.lastaflute.web.Execute;
|
||||
import org.lastaflute.web.response.ActionResponse;
|
||||
|
@ -431,121 +437,157 @@ public class AdminBackupAction extends FessAdminAction {
|
|||
}
|
||||
|
||||
public static Consumer<Writer> getSearchLogNdjsonWriteCall() {
|
||||
final FessConfig fessConfig = ComponentUtil.getFessConfig();
|
||||
final SystemHelper systemHelper = ComponentUtil.getSystemHelper();
|
||||
final long timeout = fessConfig.getIndexBackupLogLoadTimeoutAsInteger().longValue();
|
||||
return writer -> {
|
||||
final SearchLogBhv bhv = ComponentUtil.getComponent(SearchLogBhv.class);
|
||||
bhv.selectCursor(cb -> {
|
||||
cb.query().matchAll();
|
||||
cb.query().addOrderBy_RequestedAt_Asc();
|
||||
}, entity -> {
|
||||
final StringBuilder buf = new StringBuilder();
|
||||
buf.append('{');
|
||||
appendJson("id", entity.getId(), buf).append(',');
|
||||
appendJson("query-id", entity.getQueryId(), buf).append(',');
|
||||
appendJson("user-info-id", entity.getUserInfoId(), buf).append(',');
|
||||
appendJson("user-session-id", entity.getUserSessionId(), buf).append(',');
|
||||
appendJson("user", entity.getUser(), buf).append(',');
|
||||
appendJson("search-word", entity.getSearchWord(), buf).append(',');
|
||||
appendJson("hit-count", entity.getHitCount(), buf).append(',');
|
||||
appendJson("query-page-size", entity.getQueryPageSize(), buf).append(',');
|
||||
appendJson("query-offset", entity.getQueryOffset(), buf).append(',');
|
||||
appendJson("referer", entity.getReferer(), buf).append(',');
|
||||
appendJson("languages", entity.getLanguages(), buf).append(',');
|
||||
appendJson("roles", entity.getRoles(), buf).append(',');
|
||||
appendJson("user-agent", entity.getUserAgent(), buf).append(',');
|
||||
appendJson("client-ip", entity.getClientIp(), buf).append(',');
|
||||
appendJson("access-type", entity.getAccessType(), buf).append(',');
|
||||
appendJson("query-time", entity.getQueryTime(), buf).append(',');
|
||||
appendJson("response-time", entity.getResponseTime(), buf).append(',');
|
||||
appendJson("requested-at", entity.getRequestedAt(), buf).append(',');
|
||||
final Map<String, List<String>> searchFieldMap = entity.getSearchFieldLogList().stream()
|
||||
.collect(Collectors.groupingBy(Pair::getFirst, Collectors.mapping(Pair::getSecond, Collectors.toList())));
|
||||
appendJson("search-field", searchFieldMap, buf).append(',');
|
||||
final Map<String, List<String>> requestHeaderMap = entity.getRequestHeaderList().stream()
|
||||
.collect(Collectors.groupingBy(Pair::getFirst, Collectors.mapping(Pair::getSecond, Collectors.toList())));
|
||||
appendJson("headers", requestHeaderMap, buf);
|
||||
buf.append('}');
|
||||
buf.append('\n');
|
||||
try {
|
||||
writer.write(buf.toString());
|
||||
} catch (final IOException e) {
|
||||
throw new IORuntimeException(e);
|
||||
}, new LogEntityRowHandler<SearchLog>() {
|
||||
@Override
|
||||
public void handle(final SearchLog entity) {
|
||||
final StringBuilder buf = new StringBuilder();
|
||||
buf.append('{');
|
||||
appendJson("id", entity.getId(), buf).append(',');
|
||||
appendJson("query-id", entity.getQueryId(), buf).append(',');
|
||||
appendJson("user-info-id", entity.getUserInfoId(), buf).append(',');
|
||||
appendJson("user-session-id", entity.getUserSessionId(), buf).append(',');
|
||||
appendJson("user", entity.getUser(), buf).append(',');
|
||||
appendJson("search-word", entity.getSearchWord(), buf).append(',');
|
||||
appendJson("hit-count", entity.getHitCount(), buf).append(',');
|
||||
appendJson("query-page-size", entity.getQueryPageSize(), buf).append(',');
|
||||
appendJson("query-offset", entity.getQueryOffset(), buf).append(',');
|
||||
appendJson("referer", entity.getReferer(), buf).append(',');
|
||||
appendJson("languages", entity.getLanguages(), buf).append(',');
|
||||
appendJson("roles", entity.getRoles(), buf).append(',');
|
||||
appendJson("user-agent", entity.getUserAgent(), buf).append(',');
|
||||
appendJson("client-ip", entity.getClientIp(), buf).append(',');
|
||||
appendJson("access-type", entity.getAccessType(), buf).append(',');
|
||||
appendJson("query-time", entity.getQueryTime(), buf).append(',');
|
||||
appendJson("response-time", entity.getResponseTime(), buf).append(',');
|
||||
appendJson("requested-at", entity.getRequestedAt(), buf).append(',');
|
||||
final Map<String, List<String>> searchFieldMap = entity.getSearchFieldLogList().stream()
|
||||
.collect(Collectors.groupingBy(Pair::getFirst, Collectors.mapping(Pair::getSecond, Collectors.toList())));
|
||||
appendJson("search-field", searchFieldMap, buf).append(',');
|
||||
final Map<String, List<String>> requestHeaderMap = entity.getRequestHeaderList().stream()
|
||||
.collect(Collectors.groupingBy(Pair::getFirst, Collectors.mapping(Pair::getSecond, Collectors.toList())));
|
||||
appendJson("headers", requestHeaderMap, buf);
|
||||
buf.append('}');
|
||||
buf.append('\n');
|
||||
try {
|
||||
writer.write(buf.toString());
|
||||
} catch (final IOException e) {
|
||||
throw new IORuntimeException(e);
|
||||
}
|
||||
if (!systemHelper.calibrateCpuLoad(timeout)) {
|
||||
breakCursor = true;
|
||||
}
|
||||
}
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
public static Consumer<Writer> getUserInfoNdjsonWriteCall() {
|
||||
final FessConfig fessConfig = ComponentUtil.getFessConfig();
|
||||
final SystemHelper systemHelper = ComponentUtil.getSystemHelper();
|
||||
final long timeout = fessConfig.getIndexBackupLogLoadTimeoutAsInteger().longValue();
|
||||
return writer -> {
|
||||
final UserInfoBhv bhv = ComponentUtil.getComponent(UserInfoBhv.class);
|
||||
bhv.selectCursor(cb -> {
|
||||
cb.query().matchAll();
|
||||
cb.query().addOrderBy_CreatedAt_Asc();
|
||||
}, entity -> {
|
||||
final StringBuilder buf = new StringBuilder();
|
||||
buf.append('{');
|
||||
appendJson("id", entity.getId(), buf).append(',');
|
||||
appendJson("created-at", entity.getCreatedAt(), buf).append(',');
|
||||
appendJson("updated-at", entity.getUpdatedAt(), buf);
|
||||
buf.append('}');
|
||||
buf.append('\n');
|
||||
try {
|
||||
writer.write(buf.toString());
|
||||
} catch (final IOException e) {
|
||||
throw new IORuntimeException(e);
|
||||
}, new LogEntityRowHandler<UserInfo>() {
|
||||
@Override
|
||||
public void handle(final UserInfo entity) {
|
||||
final StringBuilder buf = new StringBuilder();
|
||||
buf.append('{');
|
||||
appendJson("id", entity.getId(), buf).append(',');
|
||||
appendJson("created-at", entity.getCreatedAt(), buf).append(',');
|
||||
appendJson("updated-at", entity.getUpdatedAt(), buf);
|
||||
buf.append('}');
|
||||
buf.append('\n');
|
||||
try {
|
||||
writer.write(buf.toString());
|
||||
} catch (final IOException e) {
|
||||
throw new IORuntimeException(e);
|
||||
}
|
||||
if (!systemHelper.calibrateCpuLoad(timeout)) {
|
||||
breakCursor = true;
|
||||
}
|
||||
}
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
public static Consumer<Writer> getFavoriteLogNdjsonWriteCall() {
|
||||
final FessConfig fessConfig = ComponentUtil.getFessConfig();
|
||||
final SystemHelper systemHelper = ComponentUtil.getSystemHelper();
|
||||
final long timeout = fessConfig.getIndexBackupLogLoadTimeoutAsInteger().longValue();
|
||||
return writer -> {
|
||||
final FavoriteLogBhv bhv = ComponentUtil.getComponent(FavoriteLogBhv.class);
|
||||
bhv.selectCursor(cb -> {
|
||||
cb.query().matchAll();
|
||||
cb.query().addOrderBy_CreatedAt_Asc();
|
||||
}, entity -> {
|
||||
final StringBuilder buf = new StringBuilder();
|
||||
buf.append('{');
|
||||
appendJson("id", entity.getId(), buf).append(',');
|
||||
appendJson("created-at", entity.getCreatedAt(), buf).append(',');
|
||||
appendJson("query-id", entity.getQueryId(), buf).append(',');
|
||||
appendJson("user-info-id", entity.getUserInfoId(), buf).append(',');
|
||||
appendJson("doc-id", entity.getDocId(), buf).append(',');
|
||||
appendJson("url", entity.getUrl(), buf);
|
||||
buf.append('}');
|
||||
buf.append('\n');
|
||||
try {
|
||||
writer.write(buf.toString());
|
||||
} catch (final IOException e) {
|
||||
throw new IORuntimeException(e);
|
||||
}, new LogEntityRowHandler<FavoriteLog>() {
|
||||
@Override
|
||||
public void handle(final FavoriteLog entity) {
|
||||
final StringBuilder buf = new StringBuilder();
|
||||
buf.append('{');
|
||||
appendJson("id", entity.getId(), buf).append(',');
|
||||
appendJson("created-at", entity.getCreatedAt(), buf).append(',');
|
||||
appendJson("query-id", entity.getQueryId(), buf).append(',');
|
||||
appendJson("user-info-id", entity.getUserInfoId(), buf).append(',');
|
||||
appendJson("doc-id", entity.getDocId(), buf).append(',');
|
||||
appendJson("url", entity.getUrl(), buf);
|
||||
buf.append('}');
|
||||
buf.append('\n');
|
||||
try {
|
||||
writer.write(buf.toString());
|
||||
} catch (final IOException e) {
|
||||
throw new IORuntimeException(e);
|
||||
}
|
||||
if (!systemHelper.calibrateCpuLoad(timeout)) {
|
||||
breakCursor = true;
|
||||
}
|
||||
}
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
public static Consumer<Writer> getClickLogNdjsonWriteCall() {
|
||||
final FessConfig fessConfig = ComponentUtil.getFessConfig();
|
||||
final SystemHelper systemHelper = ComponentUtil.getSystemHelper();
|
||||
final long timeout = fessConfig.getIndexBackupLogLoadTimeoutAsInteger().longValue();
|
||||
return writer -> {
|
||||
final ClickLogBhv bhv = ComponentUtil.getComponent(ClickLogBhv.class);
|
||||
bhv.selectCursor(cb -> {
|
||||
cb.query().matchAll();
|
||||
cb.query().addOrderBy_RequestedAt_Asc();
|
||||
}, entity -> {
|
||||
final StringBuilder buf = new StringBuilder();
|
||||
buf.append('{');
|
||||
appendJson("id", entity.getId(), buf).append(',');
|
||||
appendJson("query-id", entity.getQueryId(), buf).append(',');
|
||||
appendJson("user-session-id", entity.getUserSessionId(), buf).append(',');
|
||||
appendJson("doc-id", entity.getDocId(), buf).append(',');
|
||||
appendJson("url", entity.getUrl(), buf).append(',');
|
||||
appendJson("order", entity.getOrder(), buf).append(',');
|
||||
appendJson("query-requested-at", entity.getQueryRequestedAt(), buf).append(',');
|
||||
appendJson("requested-at", entity.getRequestedAt(), buf);
|
||||
buf.append('}');
|
||||
buf.append('\n');
|
||||
try {
|
||||
writer.write(buf.toString());
|
||||
} catch (final IOException e) {
|
||||
throw new IORuntimeException(e);
|
||||
}, new LogEntityRowHandler<ClickLog>() {
|
||||
@Override
|
||||
public void handle(final ClickLog entity) {
|
||||
final StringBuilder buf = new StringBuilder();
|
||||
buf.append('{');
|
||||
appendJson("id", entity.getId(), buf).append(',');
|
||||
appendJson("query-id", entity.getQueryId(), buf).append(',');
|
||||
appendJson("user-session-id", entity.getUserSessionId(), buf).append(',');
|
||||
appendJson("doc-id", entity.getDocId(), buf).append(',');
|
||||
appendJson("url", entity.getUrl(), buf).append(',');
|
||||
appendJson("order", entity.getOrder(), buf).append(',');
|
||||
appendJson("query-requested-at", entity.getQueryRequestedAt(), buf).append(',');
|
||||
appendJson("requested-at", entity.getRequestedAt(), buf);
|
||||
buf.append('}');
|
||||
buf.append('\n');
|
||||
try {
|
||||
writer.write(buf.toString());
|
||||
} catch (final IOException e) {
|
||||
throw new IORuntimeException(e);
|
||||
}
|
||||
if (!systemHelper.calibrateCpuLoad(timeout)) {
|
||||
breakCursor = true;
|
||||
}
|
||||
}
|
||||
});
|
||||
};
|
||||
|
@ -572,4 +614,12 @@ public class AdminBackupAction extends FessAdminAction {
|
|||
}
|
||||
}
|
||||
|
||||
private static abstract class LogEntityRowHandler<ENTITY> implements EntityRowHandler<ENTITY> {
|
||||
protected boolean breakCursor = false;
|
||||
|
||||
@Override
|
||||
public boolean isBreakCursor() {
|
||||
return breakCursor;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -601,19 +601,31 @@ public class SystemHelper {
|
|||
}
|
||||
}
|
||||
|
||||
public void calibrateCpuLoad() {
|
||||
public boolean calibrateCpuLoad() {
|
||||
return calibrateCpuLoad(0L);
|
||||
}
|
||||
|
||||
public boolean calibrateCpuLoad(final long timeoutInMillis) {
|
||||
final short percent = ComponentUtil.getFessConfig().getAdaptiveLoadControlAsInteger().shortValue();
|
||||
if (percent <= 0) {
|
||||
return;
|
||||
return true;
|
||||
}
|
||||
short current = getSystemCpuPercent();
|
||||
if (current < percent) {
|
||||
return;
|
||||
return true;
|
||||
}
|
||||
final long startTime = getCurrentTimeAsLong();
|
||||
final String threadName = Thread.currentThread().getName();
|
||||
try {
|
||||
waitingThreadNames.add(threadName);
|
||||
while (current >= percent) {
|
||||
if (timeoutInMillis > 0 && getCurrentTimeAsLong() - startTime > timeoutInMillis) {
|
||||
if (logger.isInfoEnabled()) {
|
||||
logger.info("Cpu Load {}% is greater than {}%. {} waiting thread(s). {} thread is timed out.", current, percent,
|
||||
waitingThreadNames.size(), threadName);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
if (logger.isInfoEnabled()) {
|
||||
logger.info("Cpu Load {}% is greater than {}%. {} waiting thread(s).", current, percent, waitingThreadNames.size());
|
||||
}
|
||||
|
@ -626,6 +638,7 @@ public class SystemHelper {
|
|||
} finally {
|
||||
waitingThreadNames.remove(threadName);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public void waitForNoWaitingThreads() {
|
||||
|
|
|
@ -1176,6 +1176,9 @@ public interface FessConfig extends FessEnv, org.codelibs.fess.mylasta.direction
|
|||
/** The key of the configuration. e.g. click_log.ndjson,favorite_log.ndjson,search_log.ndjson,user_info.ndjson */
|
||||
String INDEX_BACKUP_LOG_TARGETS = "index.backup.log.targets";
|
||||
|
||||
/** The key of the configuration. e.g. 60000 */
|
||||
String INDEX_BACKUP_LOG_LOAD_TIMEOUT = "index.backup.log.load.timeout";
|
||||
|
||||
/** The key of the configuration. e.g. true */
|
||||
String LOGGING_SEARCH_DOCS_ENABLED = "logging.search.docs.enabled";
|
||||
|
||||
|
@ -5505,6 +5508,21 @@ public interface FessConfig extends FessEnv, org.codelibs.fess.mylasta.direction
|
|||
*/
|
||||
String getIndexBackupLogTargets();
|
||||
|
||||
/**
|
||||
* Get the value for the key 'index.backup.log.load.timeout'. <br>
|
||||
* The value is, e.g. 60000 <br>
|
||||
* @return The value of found property. (NotNull: if not found, exception but basically no way)
|
||||
*/
|
||||
String getIndexBackupLogLoadTimeout();
|
||||
|
||||
/**
|
||||
* Get the value for the key 'index.backup.log.load.timeout' as {@link Integer}. <br>
|
||||
* The value is, e.g. 60000 <br>
|
||||
* @return The value of found property. (NotNull: if not found, exception but basically no way)
|
||||
* @throws NumberFormatException When the property is not integer.
|
||||
*/
|
||||
Integer getIndexBackupLogLoadTimeoutAsInteger();
|
||||
|
||||
/**
|
||||
* Get the value for the key 'logging.search.docs.enabled'. <br>
|
||||
* The value is, e.g. true <br>
|
||||
|
@ -9611,6 +9629,14 @@ public interface FessConfig extends FessEnv, org.codelibs.fess.mylasta.direction
|
|||
return get(FessConfig.INDEX_BACKUP_LOG_TARGETS);
|
||||
}
|
||||
|
||||
public String getIndexBackupLogLoadTimeout() {
|
||||
return get(FessConfig.INDEX_BACKUP_LOG_LOAD_TIMEOUT);
|
||||
}
|
||||
|
||||
public Integer getIndexBackupLogLoadTimeoutAsInteger() {
|
||||
return getAsInteger(FessConfig.INDEX_BACKUP_LOG_LOAD_TIMEOUT);
|
||||
}
|
||||
|
||||
public String getLoggingSearchDocsEnabled() {
|
||||
return get(FessConfig.LOGGING_SEARCH_DOCS_ENABLED);
|
||||
}
|
||||
|
@ -11155,6 +11181,7 @@ public interface FessConfig extends FessEnv, org.codelibs.fess.mylasta.direction
|
|||
defaultMap.put(FessConfig.INDEX_BACKUP_TARGETS,
|
||||
"fess_basic_config.bulk,fess_config.bulk,fess_user.bulk,system.properties,fess.json,doc.json");
|
||||
defaultMap.put(FessConfig.INDEX_BACKUP_LOG_TARGETS, "click_log.ndjson,favorite_log.ndjson,search_log.ndjson,user_info.ndjson");
|
||||
defaultMap.put(FessConfig.INDEX_BACKUP_LOG_LOAD_TIMEOUT, "60000");
|
||||
defaultMap.put(FessConfig.LOGGING_SEARCH_DOCS_ENABLED, "true");
|
||||
defaultMap.put(FessConfig.LOGGING_SEARCH_DOCS_FIELDS,
|
||||
"filetype,created,click_count,title,doc_id,url,score,site,filename,host,digest,boost,mimetype,favorite_count,_id,lang,last_modified,content_length,timestamp");
|
||||
|
|
|
@ -621,6 +621,7 @@ ftp.role.from.file=true
|
|||
# backup
|
||||
index.backup.targets=fess_basic_config.bulk,fess_config.bulk,fess_user.bulk,system.properties,fess.json,doc.json
|
||||
index.backup.log.targets=click_log.ndjson,favorite_log.ndjson,search_log.ndjson,user_info.ndjson
|
||||
index.backup.log.load.timeout=60000
|
||||
|
||||
# logging
|
||||
logging.search.docs.enabled=true
|
||||
|
|
Loading…
Add table
Reference in a new issue