浏览代码

fix #2827 Add CPU load calibration during search log download to prevent high CPU usage.

Shinsuke Sugaya 1 年之前
父节点
当前提交
a21cea6ffe

+ 127 - 77
src/main/java/org/codelibs/fess/app/web/admin/backup/AdminBackupAction.java

@@ -61,12 +61,18 @@ import org.codelibs.fess.es.log.exbhv.ClickLogBhv;
 import org.codelibs.fess.es.log.exbhv.FavoriteLogBhv;
 import org.codelibs.fess.es.log.exbhv.SearchLogBhv;
 import org.codelibs.fess.es.log.exbhv.UserInfoBhv;
+import org.codelibs.fess.es.log.exentity.ClickLog;
+import org.codelibs.fess.es.log.exentity.FavoriteLog;
+import org.codelibs.fess.es.log.exentity.SearchLog;
+import org.codelibs.fess.es.log.exentity.UserInfo;
+import org.codelibs.fess.helper.SystemHelper;
 import org.codelibs.fess.mylasta.direction.FessConfig;
 import org.codelibs.fess.util.ComponentUtil;
 import org.codelibs.fess.util.GsaConfigParser;
 import org.codelibs.fess.util.RenderDataUtil;
 import org.codelibs.fess.util.ResourceUtil;
 import org.codelibs.fess.util.SearchEngineUtil;
+import org.dbflute.bhv.readable.EntityRowHandler;
 import org.lastaflute.core.magic.async.AsyncManager;
 import org.lastaflute.web.Execute;
 import org.lastaflute.web.response.ActionResponse;
@@ -431,121 +437,157 @@ public class AdminBackupAction extends FessAdminAction {
     }
 
     public static Consumer<Writer> getSearchLogNdjsonWriteCall() {
+        final FessConfig fessConfig = ComponentUtil.getFessConfig();
+        final SystemHelper systemHelper = ComponentUtil.getSystemHelper();
+        final long timeout = fessConfig.getIndexBackupLogLoadTimeoutAsInteger().longValue();
         return writer -> {
             final SearchLogBhv bhv = ComponentUtil.getComponent(SearchLogBhv.class);
             bhv.selectCursor(cb -> {
                 cb.query().matchAll();
                 cb.query().addOrderBy_RequestedAt_Asc();
-            }, entity -> {
-                final StringBuilder buf = new StringBuilder();
-                buf.append('{');
-                appendJson("id", entity.getId(), buf).append(',');
-                appendJson("query-id", entity.getQueryId(), buf).append(',');
-                appendJson("user-info-id", entity.getUserInfoId(), buf).append(',');
-                appendJson("user-session-id", entity.getUserSessionId(), buf).append(',');
-                appendJson("user", entity.getUser(), buf).append(',');
-                appendJson("search-word", entity.getSearchWord(), buf).append(',');
-                appendJson("hit-count", entity.getHitCount(), buf).append(',');
-                appendJson("query-page-size", entity.getQueryPageSize(), buf).append(',');
-                appendJson("query-offset", entity.getQueryOffset(), buf).append(',');
-                appendJson("referer", entity.getReferer(), buf).append(',');
-                appendJson("languages", entity.getLanguages(), buf).append(',');
-                appendJson("roles", entity.getRoles(), buf).append(',');
-                appendJson("user-agent", entity.getUserAgent(), buf).append(',');
-                appendJson("client-ip", entity.getClientIp(), buf).append(',');
-                appendJson("access-type", entity.getAccessType(), buf).append(',');
-                appendJson("query-time", entity.getQueryTime(), buf).append(',');
-                appendJson("response-time", entity.getResponseTime(), buf).append(',');
-                appendJson("requested-at", entity.getRequestedAt(), buf).append(',');
-                final Map<String, List<String>> searchFieldMap = entity.getSearchFieldLogList().stream()
-                        .collect(Collectors.groupingBy(Pair::getFirst, Collectors.mapping(Pair::getSecond, Collectors.toList())));
-                appendJson("search-field", searchFieldMap, buf).append(',');
-                final Map<String, List<String>> requestHeaderMap = entity.getRequestHeaderList().stream()
-                        .collect(Collectors.groupingBy(Pair::getFirst, Collectors.mapping(Pair::getSecond, Collectors.toList())));
-                appendJson("headers", requestHeaderMap, buf);
-                buf.append('}');
-                buf.append('\n');
-                try {
-                    writer.write(buf.toString());
-                } catch (final IOException e) {
-                    throw new IORuntimeException(e);
+            }, new LogEntityRowHandler<SearchLog>() {
+                @Override
+                public void handle(final SearchLog entity) {
+                    final StringBuilder buf = new StringBuilder();
+                    buf.append('{');
+                    appendJson("id", entity.getId(), buf).append(',');
+                    appendJson("query-id", entity.getQueryId(), buf).append(',');
+                    appendJson("user-info-id", entity.getUserInfoId(), buf).append(',');
+                    appendJson("user-session-id", entity.getUserSessionId(), buf).append(',');
+                    appendJson("user", entity.getUser(), buf).append(',');
+                    appendJson("search-word", entity.getSearchWord(), buf).append(',');
+                    appendJson("hit-count", entity.getHitCount(), buf).append(',');
+                    appendJson("query-page-size", entity.getQueryPageSize(), buf).append(',');
+                    appendJson("query-offset", entity.getQueryOffset(), buf).append(',');
+                    appendJson("referer", entity.getReferer(), buf).append(',');
+                    appendJson("languages", entity.getLanguages(), buf).append(',');
+                    appendJson("roles", entity.getRoles(), buf).append(',');
+                    appendJson("user-agent", entity.getUserAgent(), buf).append(',');
+                    appendJson("client-ip", entity.getClientIp(), buf).append(',');
+                    appendJson("access-type", entity.getAccessType(), buf).append(',');
+                    appendJson("query-time", entity.getQueryTime(), buf).append(',');
+                    appendJson("response-time", entity.getResponseTime(), buf).append(',');
+                    appendJson("requested-at", entity.getRequestedAt(), buf).append(',');
+                    final Map<String, List<String>> searchFieldMap = entity.getSearchFieldLogList().stream()
+                            .collect(Collectors.groupingBy(Pair::getFirst, Collectors.mapping(Pair::getSecond, Collectors.toList())));
+                    appendJson("search-field", searchFieldMap, buf).append(',');
+                    final Map<String, List<String>> requestHeaderMap = entity.getRequestHeaderList().stream()
+                            .collect(Collectors.groupingBy(Pair::getFirst, Collectors.mapping(Pair::getSecond, Collectors.toList())));
+                    appendJson("headers", requestHeaderMap, buf);
+                    buf.append('}');
+                    buf.append('\n');
+                    try {
+                        writer.write(buf.toString());
+                    } catch (final IOException e) {
+                        throw new IORuntimeException(e);
+                    }
+                    if (!systemHelper.calibrateCpuLoad(timeout)) {
+                        breakCursor = true;
+                    }
                 }
             });
         };
     }
 
     public static Consumer<Writer> getUserInfoNdjsonWriteCall() {
+        final FessConfig fessConfig = ComponentUtil.getFessConfig();
+        final SystemHelper systemHelper = ComponentUtil.getSystemHelper();
+        final long timeout = fessConfig.getIndexBackupLogLoadTimeoutAsInteger().longValue();
         return writer -> {
             final UserInfoBhv bhv = ComponentUtil.getComponent(UserInfoBhv.class);
             bhv.selectCursor(cb -> {
                 cb.query().matchAll();
                 cb.query().addOrderBy_CreatedAt_Asc();
-            }, entity -> {
-                final StringBuilder buf = new StringBuilder();
-                buf.append('{');
-                appendJson("id", entity.getId(), buf).append(',');
-                appendJson("created-at", entity.getCreatedAt(), buf).append(',');
-                appendJson("updated-at", entity.getUpdatedAt(), buf);
-                buf.append('}');
-                buf.append('\n');
-                try {
-                    writer.write(buf.toString());
-                } catch (final IOException e) {
-                    throw new IORuntimeException(e);
+            }, new LogEntityRowHandler<UserInfo>() {
+                @Override
+                public void handle(final UserInfo entity) {
+                    final StringBuilder buf = new StringBuilder();
+                    buf.append('{');
+                    appendJson("id", entity.getId(), buf).append(',');
+                    appendJson("created-at", entity.getCreatedAt(), buf).append(',');
+                    appendJson("updated-at", entity.getUpdatedAt(), buf);
+                    buf.append('}');
+                    buf.append('\n');
+                    try {
+                        writer.write(buf.toString());
+                    } catch (final IOException e) {
+                        throw new IORuntimeException(e);
+                    }
+                    if (!systemHelper.calibrateCpuLoad(timeout)) {
+                        breakCursor = true;
+                    }
                 }
             });
         };
     }
 
     public static Consumer<Writer> getFavoriteLogNdjsonWriteCall() {
+        final FessConfig fessConfig = ComponentUtil.getFessConfig();
+        final SystemHelper systemHelper = ComponentUtil.getSystemHelper();
+        final long timeout = fessConfig.getIndexBackupLogLoadTimeoutAsInteger().longValue();
         return writer -> {
             final FavoriteLogBhv bhv = ComponentUtil.getComponent(FavoriteLogBhv.class);
             bhv.selectCursor(cb -> {
                 cb.query().matchAll();
                 cb.query().addOrderBy_CreatedAt_Asc();
-            }, entity -> {
-                final StringBuilder buf = new StringBuilder();
-                buf.append('{');
-                appendJson("id", entity.getId(), buf).append(',');
-                appendJson("created-at", entity.getCreatedAt(), buf).append(',');
-                appendJson("query-id", entity.getQueryId(), buf).append(',');
-                appendJson("user-info-id", entity.getUserInfoId(), buf).append(',');
-                appendJson("doc-id", entity.getDocId(), buf).append(',');
-                appendJson("url", entity.getUrl(), buf);
-                buf.append('}');
-                buf.append('\n');
-                try {
-                    writer.write(buf.toString());
-                } catch (final IOException e) {
-                    throw new IORuntimeException(e);
+            }, new LogEntityRowHandler<FavoriteLog>() {
+                @Override
+                public void handle(final FavoriteLog entity) {
+                    final StringBuilder buf = new StringBuilder();
+                    buf.append('{');
+                    appendJson("id", entity.getId(), buf).append(',');
+                    appendJson("created-at", entity.getCreatedAt(), buf).append(',');
+                    appendJson("query-id", entity.getQueryId(), buf).append(',');
+                    appendJson("user-info-id", entity.getUserInfoId(), buf).append(',');
+                    appendJson("doc-id", entity.getDocId(), buf).append(',');
+                    appendJson("url", entity.getUrl(), buf);
+                    buf.append('}');
+                    buf.append('\n');
+                    try {
+                        writer.write(buf.toString());
+                    } catch (final IOException e) {
+                        throw new IORuntimeException(e);
+                    }
+                    if (!systemHelper.calibrateCpuLoad(timeout)) {
+                        breakCursor = true;
+                    }
                 }
             });
         };
     }
 
     public static Consumer<Writer> getClickLogNdjsonWriteCall() {
+        final FessConfig fessConfig = ComponentUtil.getFessConfig();
+        final SystemHelper systemHelper = ComponentUtil.getSystemHelper();
+        final long timeout = fessConfig.getIndexBackupLogLoadTimeoutAsInteger().longValue();
         return writer -> {
             final ClickLogBhv bhv = ComponentUtil.getComponent(ClickLogBhv.class);
             bhv.selectCursor(cb -> {
                 cb.query().matchAll();
                 cb.query().addOrderBy_RequestedAt_Asc();
-            }, entity -> {
-                final StringBuilder buf = new StringBuilder();
-                buf.append('{');
-                appendJson("id", entity.getId(), buf).append(',');
-                appendJson("query-id", entity.getQueryId(), buf).append(',');
-                appendJson("user-session-id", entity.getUserSessionId(), buf).append(',');
-                appendJson("doc-id", entity.getDocId(), buf).append(',');
-                appendJson("url", entity.getUrl(), buf).append(',');
-                appendJson("order", entity.getOrder(), buf).append(',');
-                appendJson("query-requested-at", entity.getQueryRequestedAt(), buf).append(',');
-                appendJson("requested-at", entity.getRequestedAt(), buf);
-                buf.append('}');
-                buf.append('\n');
-                try {
-                    writer.write(buf.toString());
-                } catch (final IOException e) {
-                    throw new IORuntimeException(e);
+            }, new LogEntityRowHandler<ClickLog>() {
+                @Override
+                public void handle(final ClickLog entity) {
+                    final StringBuilder buf = new StringBuilder();
+                    buf.append('{');
+                    appendJson("id", entity.getId(), buf).append(',');
+                    appendJson("query-id", entity.getQueryId(), buf).append(',');
+                    appendJson("user-session-id", entity.getUserSessionId(), buf).append(',');
+                    appendJson("doc-id", entity.getDocId(), buf).append(',');
+                    appendJson("url", entity.getUrl(), buf).append(',');
+                    appendJson("order", entity.getOrder(), buf).append(',');
+                    appendJson("query-requested-at", entity.getQueryRequestedAt(), buf).append(',');
+                    appendJson("requested-at", entity.getRequestedAt(), buf);
+                    buf.append('}');
+                    buf.append('\n');
+                    try {
+                        writer.write(buf.toString());
+                    } catch (final IOException e) {
+                        throw new IORuntimeException(e);
+                    }
+                    if (!systemHelper.calibrateCpuLoad(timeout)) {
+                        breakCursor = true;
+                    }
                 }
             });
         };
@@ -572,4 +614,12 @@ public class AdminBackupAction extends FessAdminAction {
         }
     }
 
+    private static abstract class LogEntityRowHandler<ENTITY> implements EntityRowHandler<ENTITY> {
+        protected boolean breakCursor = false;
+
+        @Override
+        public boolean isBreakCursor() {
+            return breakCursor;
+        }
+    }
 }

+ 16 - 3
src/main/java/org/codelibs/fess/helper/SystemHelper.java

@@ -602,19 +602,31 @@ public class SystemHelper {
         }
     }
 
-    public void calibrateCpuLoad() {
+    public boolean calibrateCpuLoad() {
+        return calibrateCpuLoad(0L);
+    }
+
+    public boolean calibrateCpuLoad(final long timeoutInMillis) {
         final short percent = ComponentUtil.getFessConfig().getAdaptiveLoadControlAsInteger().shortValue();
         if (percent <= 0) {
-            return;
+            return true;
         }
         short current = getSystemCpuPercent();
         if (current < percent) {
-            return;
+            return true;
         }
+        final long startTime = getCurrentTimeAsLong();
         final String threadName = Thread.currentThread().getName();
         try {
             waitingThreadNames.add(threadName);
             while (current >= percent) {
+                if (timeoutInMillis > 0 && getCurrentTimeAsLong() - startTime > timeoutInMillis) {
+                    if (logger.isInfoEnabled()) {
+                        logger.info("Cpu Load {}% is greater than {}%. {} waiting thread(s). {} thread is timed out.", current, percent,
+                                waitingThreadNames.size(), threadName);
+                    }
+                    return false;
+                }
                 if (logger.isInfoEnabled()) {
                     logger.info("Cpu Load {}% is greater than {}%. {} waiting thread(s).", current, percent, waitingThreadNames.size());
                 }
@@ -627,6 +639,7 @@ public class SystemHelper {
         } finally {
             waitingThreadNames.remove(threadName);
         }
+        return true;
     }
 
     public void waitForNoWaitingThreads() {

+ 27 - 0
src/main/java/org/codelibs/fess/mylasta/direction/FessConfig.java

@@ -1176,6 +1176,9 @@ public interface FessConfig extends FessEnv, org.codelibs.fess.mylasta.direction
     /** The key of the configuration. e.g. click_log.ndjson,favorite_log.ndjson,search_log.ndjson,user_info.ndjson */
     String INDEX_BACKUP_LOG_TARGETS = "index.backup.log.targets";
 
+    /** The key of the configuration. e.g. 60000 */
+    String INDEX_BACKUP_LOG_LOAD_TIMEOUT = "index.backup.log.load.timeout";
+
     /** The key of the configuration. e.g. true */
     String LOGGING_SEARCH_DOCS_ENABLED = "logging.search.docs.enabled";
 
@@ -5505,6 +5508,21 @@ public interface FessConfig extends FessEnv, org.codelibs.fess.mylasta.direction
      */
     String getIndexBackupLogTargets();
 
+    /**
+     * Get the value for the key 'index.backup.log.load.timeout'. <br>
+     * The value is, e.g. 60000 <br>
+     * @return The value of found property. (NotNull: if not found, exception but basically no way)
+     */
+    String getIndexBackupLogLoadTimeout();
+
+    /**
+     * Get the value for the key 'index.backup.log.load.timeout' as {@link Integer}. <br>
+     * The value is, e.g. 60000 <br>
+     * @return The value of found property. (NotNull: if not found, exception but basically no way)
+     * @throws NumberFormatException When the property is not integer.
+     */
+    Integer getIndexBackupLogLoadTimeoutAsInteger();
+
     /**
      * Get the value for the key 'logging.search.docs.enabled'. <br>
      * The value is, e.g. true <br>
@@ -9611,6 +9629,14 @@ public interface FessConfig extends FessEnv, org.codelibs.fess.mylasta.direction
             return get(FessConfig.INDEX_BACKUP_LOG_TARGETS);
         }
 
+        public String getIndexBackupLogLoadTimeout() {
+            return get(FessConfig.INDEX_BACKUP_LOG_LOAD_TIMEOUT);
+        }
+
+        public Integer getIndexBackupLogLoadTimeoutAsInteger() {
+            return getAsInteger(FessConfig.INDEX_BACKUP_LOG_LOAD_TIMEOUT);
+        }
+
         public String getLoggingSearchDocsEnabled() {
             return get(FessConfig.LOGGING_SEARCH_DOCS_ENABLED);
         }
@@ -11155,6 +11181,7 @@ public interface FessConfig extends FessEnv, org.codelibs.fess.mylasta.direction
             defaultMap.put(FessConfig.INDEX_BACKUP_TARGETS,
                     "fess_basic_config.bulk,fess_config.bulk,fess_user.bulk,system.properties,fess.json,doc.json");
             defaultMap.put(FessConfig.INDEX_BACKUP_LOG_TARGETS, "click_log.ndjson,favorite_log.ndjson,search_log.ndjson,user_info.ndjson");
+            defaultMap.put(FessConfig.INDEX_BACKUP_LOG_LOAD_TIMEOUT, "60000");
             defaultMap.put(FessConfig.LOGGING_SEARCH_DOCS_ENABLED, "true");
             defaultMap.put(FessConfig.LOGGING_SEARCH_DOCS_FIELDS,
                     "filetype,created,click_count,title,doc_id,url,score,site,filename,host,digest,boost,mimetype,favorite_count,_id,lang,last_modified,content_length,timestamp");

+ 1 - 0
src/main/resources/fess_config.properties

@@ -621,6 +621,7 @@ ftp.role.from.file=true
 # backup
 index.backup.targets=fess_basic_config.bulk,fess_config.bulk,fess_user.bulk,system.properties,fess.json,doc.json
 index.backup.log.targets=click_log.ndjson,favorite_log.ndjson,search_log.ndjson,user_info.ndjson
+index.backup.log.load.timeout=60000
 
 # logging
 logging.search.docs.enabled=true