Переглянути джерело

update syslog to use work queue processor

Jason Rivard 9 роки тому
батько
коміт
55ea12dd7c

+ 39 - 83
src/main/java/password/pwm/svc/event/SyslogAuditService.java

@@ -45,10 +45,7 @@ import password.pwm.error.PwmOperationalException;
 import password.pwm.health.HealthRecord;
 import password.pwm.health.HealthStatus;
 import password.pwm.health.HealthTopic;
-import password.pwm.util.Helper;
-import password.pwm.util.JsonUtil;
-import password.pwm.util.TimeDuration;
-import password.pwm.util.X509Utils;
+import password.pwm.util.*;
 import password.pwm.util.localdb.LocalDB;
 import password.pwm.util.localdb.LocalDBException;
 import password.pwm.util.localdb.LocalDBStoredQueue;
@@ -67,33 +64,20 @@ public class SyslogAuditService {
     private static final PwmLogger LOGGER = PwmLogger.forClass(SyslogAuditService.class);
 
     private static final int WARNING_WINDOW_MS = 30 * 60 * 1000;
-    private static final String QUEUE_STORAGE_DELIMINATOR = "###";
     private static final String SYSLOG_INSTANCE_NAME = "syslog-audit";
 
-    private final int MAX_QUEUE_SIZE;
-    private final long MAX_AGE_MS;
-    private final long RETRY_TIMEOUT_MS;
-    private final LocalDBStoredQueue syslogQueue;
-
-    private volatile Date lastSendError;
     private SyslogIF syslogInstance = null;
     private ErrorInformation lastError = null;
     private X509Certificate[] certificates = null;
 
+    private WorkQueueProcessor<AuditRecord> workQueueProcessor;
+
+
     private final Configuration configuration;
-    private final Timer timer;
 
     public SyslogAuditService(final PwmApplication pwmApplication)
             throws LocalDBException
     {
-        timer = new Timer(Helper.makeThreadName(pwmApplication,SyslogAuditService.class),true);
-
-        MAX_QUEUE_SIZE = Integer.parseInt(pwmApplication.getConfig().readAppProperty(AppProperty.QUEUE_SYSLOG_MAX_COUNT));
-        MAX_AGE_MS = Long.parseLong(pwmApplication.getConfig().readAppProperty(AppProperty.QUEUE_SYSLOG_MAX_AGE_MS));
-        RETRY_TIMEOUT_MS = Long.parseLong(pwmApplication.getConfig().readAppProperty(AppProperty.QUEUE_SYSLOG_RETRY_TIMEOUT_MS));
-
-        syslogQueue = LocalDBStoredQueue.createLocalDBStoredQueue(pwmApplication, pwmApplication.getLocalDB(), LocalDB.DB.SYSLOG_QUEUE);
-
         this.configuration = pwmApplication.getConfig();
         this.certificates = configuration.readSettingAsCertificate(PwmSetting.AUDIT_SYSLOG_CERTIFICATES);
 
@@ -102,11 +86,31 @@ public class SyslogAuditService {
         try {
             syslogConfig = SyslogConfig.fromConfigString(syslogConfigString);
             syslogInstance = makeSyslogInstance(syslogConfig);
-            timer.schedule(new WriterTask(),1000);
             LOGGER.trace("queued service running for " + syslogConfig);
         } catch (IllegalArgumentException e) {
             LOGGER.error("error parsing syslog configuration for '" + syslogConfigString + "', error: " + e.getMessage());
         }
+
+        final WorkQueueProcessor.Settings settings = new WorkQueueProcessor.Settings();
+        settings.setMaxEvents(Integer.parseInt(pwmApplication.getConfig().readAppProperty(AppProperty.QUEUE_SYSLOG_MAX_COUNT)));
+        settings.setRetryDiscardAge(new TimeDuration(Long.parseLong(pwmApplication.getConfig().readAppProperty(AppProperty.QUEUE_SYSLOG_MAX_AGE_MS))));
+        settings.setRetryInterval(new TimeDuration(Long.parseLong(pwmApplication.getConfig().readAppProperty(AppProperty.QUEUE_SYSLOG_RETRY_TIMEOUT_MS))));
+
+        final LocalDBStoredQueue localDBStoredQueue = LocalDBStoredQueue.createLocalDBStoredQueue(pwmApplication, pwmApplication.getLocalDB(), LocalDB.DB.SYSLOG_QUEUE);
+
+        workQueueProcessor = new WorkQueueProcessor<>(pwmApplication, localDBStoredQueue, settings, new SyslogItemProcessor(), this.getClass());
+    }
+
+    private class SyslogItemProcessor implements WorkQueueProcessor.ItemProcessor<AuditRecord> {
+        @Override
+        public WorkQueueProcessor.ProcessResult process(AuditRecord workItem) {
+            return processEvent(workItem);
+        }
+
+        @Override
+        public String convertToDebugString(AuditRecord workItem) {
+            return JsonUtil.serialize(workItem);
+        }
     }
 
     private SyslogIF makeSyslogInstance(final SyslogConfig syslogConfig)
@@ -150,16 +154,11 @@ public class SyslogAuditService {
     }
 
     public void add(AuditRecord event) throws PwmOperationalException {
-        if (syslogQueue.size() >= MAX_QUEUE_SIZE) {
-            final String errorMsg = "dropping audit record event due to queue full " + event.toString() + ", queue length=" + syslogQueue.size();
-            LOGGER.warn(errorMsg);
-            throw new PwmOperationalException(PwmError.ERROR_SYSLOG_WRITE_ERROR,errorMsg);
+        try {
+            workQueueProcessor.submit(event);
+        } catch (PwmOperationalException e) {
+            LOGGER.warn("unable to add email to queue: " + e.getMessage());
         }
-
-        final String prefix = event.getClass().getCanonicalName();
-        final String jsonValue = prefix + QUEUE_STORAGE_DELIMINATOR + JsonUtil.serialize(event);
-        syslogQueue.offerLast(jsonValue);
-        timer.schedule(new WriterTask(),1);
     }
 
     public List<HealthRecord> healthCheck() {
@@ -174,72 +173,29 @@ public class SyslogAuditService {
         return healthRecords;
     }
 
-    private class WriterTask extends TimerTask {
-        @Override
-        public void run() {
-            if (lastSendError != null) {
-                if (TimeDuration.fromCurrent(lastSendError).isLongerThan(RETRY_TIMEOUT_MS)) {
-                    lastSendError = null;
-                }
-            }
-
-            while (!syslogQueue.isEmpty() && lastSendError == null) {
-                AuditRecord record = null;
-                try {
-                    final String storedString = syslogQueue.peekFirst();
-                    final String[] splitString = storedString.split(QUEUE_STORAGE_DELIMINATOR,2);
-                    final String className = splitString[0];
-                    final String jsonString = splitString[1];
-                    record = (AuditRecord) JsonUtil.deserialize(jsonString,Class.forName(className));
-                } catch (Exception e) {
-                    LOGGER.error("error decoding stored syslog event, discarding; error: " + e.getMessage());
-                    syslogQueue.removeFirst();
-                }
-                if (record != null) {
-                    final TimeDuration recordAge = TimeDuration.fromCurrent(record.getTimestamp());
-                    if (recordAge.isLongerThan(MAX_AGE_MS)) {
-                        LOGGER.info("discarding syslog audit event, maximum queued age exceeded: " + JsonUtil.serialize(record));
-                        syslogQueue.removeFirst();
-                    } else {
-                        boolean sendSuccess = processEvent(record);
-                        if (sendSuccess) {
-                            syslogQueue.removeFirst();
-                            lastSendError = null;
-                        } else {
-                            lastSendError = new Date();
-                            timer.schedule(new WriterTask(),RETRY_TIMEOUT_MS + 1);
-                        }
-                    }
-                }
-            }
-        }
-    }
-
-    private boolean processEvent(final AuditRecord auditRecord) {
-        final StringBuilder sb = new StringBuilder();
-        sb.append(PwmConstants.PWM_APP_NAME);
-        sb.append(" ");
-        sb.append(JsonUtil.serialize(auditRecord));
+    private WorkQueueProcessor.ProcessResult processEvent(final AuditRecord auditRecord) {
+        final String syslogEventString = JsonUtil.serialize(auditRecord);
 
         final SyslogIF syslogIF = syslogInstance;
         try {
-            syslogIF.info(sb.toString());
-            LOGGER.trace("delivered syslog audit event: " + sb.toString());
+            syslogIF.info(syslogEventString);
+            LOGGER.trace("delivered syslog audit event: " + syslogEventString);
             lastError = null;
-            return true;
+            return WorkQueueProcessor.ProcessResult.SUCCESS;
         } catch (Exception e) {
-            final ErrorInformation errorInformation = new ErrorInformation(PwmError.ERROR_SYSLOG_WRITE_ERROR, e.getMessage(), new String[]{e.getMessage()});
+            final String errorMsg = "error while sending syslog message to remote service: " + e.getMessage();
+            final ErrorInformation errorInformation = new ErrorInformation(PwmError.ERROR_SYSLOG_WRITE_ERROR, errorMsg, new String[]{e.getMessage()});
             lastError = errorInformation;
             LOGGER.error(errorInformation.toDebugStr());
         }
 
-        return false;
+        return WorkQueueProcessor.ProcessResult.RETRY;
     }
 
     public void close() {
         final SyslogIF syslogIF = syslogInstance;
         syslogIF.shutdown();
-        timer.cancel();
+        workQueueProcessor.close();
         syslogInstance = null;
     }
 
@@ -301,7 +257,7 @@ public class SyslogAuditService {
     }
 
     public int queueSize() {
-        return syslogQueue != null ? syslogQueue.size() : 0;
+        return workQueueProcessor.queueSize();
     }
 
     private class LocalTrustSyslogWriterClass extends SSLTCPNetSyslogWriter {

+ 20 - 7
src/main/java/password/pwm/svc/sessiontrack/SessionTrackService.java

@@ -25,8 +25,8 @@ package password.pwm.svc.sessiontrack;
 import password.pwm.PwmApplication;
 import password.pwm.bean.LocalSessionStateBean;
 import password.pwm.bean.LoginInfoBean;
-import password.pwm.bean.pub.SessionStateInfoBean;
 import password.pwm.bean.UserInfoBean;
+import password.pwm.bean.pub.SessionStateInfoBean;
 import password.pwm.error.PwmException;
 import password.pwm.health.HealthRecord;
 import password.pwm.http.PwmSession;
@@ -38,7 +38,7 @@ import java.util.*;
 public class SessionTrackService implements PwmService {
     private static final PwmLogger LOGGER = PwmLogger.forClass(SessionTrackService.class);
 
-    private Set<PwmSession> pwmSessions = Collections.newSetFromMap(new WeakHashMap<PwmSession, Boolean>());
+    private final Set<PwmSession> pwmSessions = Collections.newSetFromMap(new WeakHashMap<PwmSession, Boolean>());
 
     private PwmApplication pwmApplication;
 
@@ -54,7 +54,9 @@ public class SessionTrackService implements PwmService {
 
     @Override
     public void close() {
-        pwmSessions.clear();
+        synchronized (pwmSessions) {
+            pwmSessions.clear();
+        }
     }
 
     @Override
@@ -74,16 +76,27 @@ public class SessionTrackService implements PwmService {
     }
 
     public void addSessionData(final PwmSession pwmSession) {
-        pwmSessions.add(pwmSession);
+        synchronized (pwmSession) {
+            pwmSessions.add(pwmSession);
+        }
     }
 
     public void removeSessionData(final PwmSession pwmSession) {
-        pwmSessions.add(pwmSession);
+        synchronized (pwmSession) {
+            pwmSessions.add(pwmSession);
+        }
+    }
+
+    private Set<PwmSession> copyOfSessionSet() {
+        synchronized (pwmSessions) {
+            return new HashSet<>(pwmSessions);
+        }
+
     }
 
     public Map<DebugKey, String> getDebugData() {
         try {
-            final Collection<PwmSession> sessionCopy = new HashSet<>(pwmSessions);
+            final Collection<PwmSession> sessionCopy = copyOfSessionSet();
             int sessionCounter = 0;
             long sizeTotal = 0;
             for (final PwmSession pwmSession : sessionCopy) {
@@ -130,7 +143,7 @@ public class SessionTrackService implements PwmService {
 
     private Set<PwmSession> currentValidSessionSet() {
         final Set<PwmSession> returnSet = new HashSet<>();
-        for (final PwmSession pwmSession : new HashSet<>(pwmSessions)) {
+        for (final PwmSession pwmSession : copyOfSessionSet()) {
             if (pwmSession != null) {
                 returnSet.add(pwmSession);
             }

+ 36 - 24
src/main/java/password/pwm/util/WorkQueueProcessor.java

@@ -37,6 +37,7 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.LockSupport;
 
 /**
  * A work item queue manager.   Items submitted to the queue will eventually be worked on by the client side @code {@link ItemProcessor}.
@@ -89,6 +90,7 @@ public class WorkQueueProcessor<W extends Serializable> {
         if (!queue.isEmpty()) {
             LOGGER.debug("opening with " + queue.size() + " items in work queue");
         }
+        LOGGER.trace("initializing worker thread with settings " + JsonUtil.serialize(settings));
 
         this.workerThread = new WorkerThread();
         workerThread.setDaemon(true);
@@ -123,7 +125,8 @@ public class WorkQueueProcessor<W extends Serializable> {
             final String errorMsg = this.getClass().getName() + " has been closed, unable to submit new item";
             throw new PwmOperationalException(new ErrorInformation(PwmError.ERROR_UNKNOWN, errorMsg));
         }
-        final ItemWrapper<W> itemWrapper = new ItemWrapper<W>(new Date(), workItem, idGenerator.nextID());
+
+        final ItemWrapper<W> itemWrapper = new ItemWrapper<>(new Date(), workItem, idGenerator.nextID());
         final String asString = JsonUtil.serialize(itemWrapper);
 
         if (settings.getMaxEvents() > 0) {
@@ -152,7 +155,7 @@ public class WorkQueueProcessor<W extends Serializable> {
         return eldestItem;
     }
 
-    private String makeDebugText(ItemWrapper itemWrapper) throws PwmOperationalException {
+    private String makeDebugText(ItemWrapper<W> itemWrapper) throws PwmOperationalException {
         final int itemsInQueue = WorkQueueProcessor.this.queueSize();
         String traceMsg = "[" + itemWrapper.toDebugString(itemProcessor) + "]";
         if (itemsInQueue > 0) {
@@ -165,7 +168,7 @@ public class WorkQueueProcessor<W extends Serializable> {
 
         private final AtomicBoolean running = new AtomicBoolean(false);
         private final AtomicBoolean shutdownFlag = new AtomicBoolean(false);
-        private final AtomicBoolean pendingWork = new AtomicBoolean(true);
+        private final AtomicBoolean notifyWorkFlag = new AtomicBoolean(true);
 
         private Date retryWakeupTime;
 
@@ -181,43 +184,52 @@ public class WorkQueueProcessor<W extends Serializable> {
                 LOGGER.error("unexpected error processing work item queue: " + Helper.readHostileExceptionMessage(t), t);
             }
 
-            try {
-                final Date shutdownStartTime = new Date();
-                while (retryWakeupTime == null && !queue.isEmpty() && TimeDuration.fromCurrent(shutdownStartTime).isLongerThan(settings.getMaxShutdownWaitTime())) {
-                    processNextItem();
+            LOGGER.trace("worker thread beginning shutdown...");
+
+            if (!queue.isEmpty()) {
+                LOGGER.trace("processing remaining " + queue.size() + " items");
+
+                try {
+                    final Date shutdownStartTime = new Date();
+                    while (retryWakeupTime == null && !queue.isEmpty() && TimeDuration.fromCurrent(shutdownStartTime).isLongerThan(settings.getMaxShutdownWaitTime())) {
+                        processNextItem();
+                    }
+                } catch (Throwable t) {
+                    LOGGER.error("unexpected error processing work item queue: " + Helper.readHostileExceptionMessage(t), t);
                 }
-            } catch (Throwable t) {
-                LOGGER.error("unexpected error processing work item queue: " + Helper.readHostileExceptionMessage(t), t);
             }
+
+            LOGGER.trace("thread exiting...");
             running.set(false);
         }
 
         void flushQueueAndClose() {
             shutdownFlag.set(true);
+            LOGGER.trace("shutdown flag set");
+            notifyWorkPending();
         }
 
         void notifyWorkPending() {
-            pendingWork.set(true);
+            notifyWorkFlag.set(true);
+            LockSupport.unpark(this);
         }
 
         private void waitForWork() {
-            if (queue.isEmpty()) {
-                pendingWork.set(false);
-                eldestItem = null;
-                if (queue.isEmpty()) { // extra queue check in case submit() comes in after the pendingWork is set to false here;
-                    pendingWork.set(true);
-                }
-                while (!shutdownFlag.get() && !pendingWork.get()) { // sleep until shutdown or work arrives.
-                    Helper.pause(103);
+            if (!shutdownFlag.get()) {
+                if (retryWakeupTime != null) {
+                    while (retryWakeupTime.after(new Date()) && !shutdownFlag.get()) {
+                        LockSupport.parkUntil(this, retryWakeupTime.getTime());
+                    }
+                    retryWakeupTime = null;
+                } else {
+                    if (queue.isEmpty() && !notifyWorkFlag.get()) {
+                        eldestItem = null;
+                        LockSupport.park(this);
+                    }
                 }
             }
 
-            if (retryWakeupTime != null) {
-                while (!shutdownFlag.get() && new Date().before(retryWakeupTime)) {
-                    Helper.pause(103);
-                }
-                retryWakeupTime = null;
-            }
+            notifyWorkFlag.set(false);
         }
 
         public boolean isRunning() {

+ 1 - 1
src/main/webapp/WEB-INF/jsp/admin-activity.jsp

@@ -60,7 +60,7 @@
         <jsp:param name="pwm.PageName" value="<%=PageName%>"/>
     </jsp:include>
     <div id="centerbody" class="wide">
-        <div id="page-content-title"><pwm:display key="Title_UserActivity" bundle="Admin" displayIfMissing="true"/></div>
+        <div id="page-content-title"><pwm:display key="Title_UserActivity" bundle="Admin"/></div>
         <%@ include file="fragment/admin-nav.jsp" %>
         <div data-dojo-type="dijit/layout/TabContainer" style="width: 100%; height: 100%;" data-dojo-props="doLayout: false, persist: true">
             <div id="ActiveWebSessions" data-dojo-type="dijit/layout/ContentPane" title="<pwm:display key="Title_Sessions" bundle="Admin"/>" class="tabContent">

+ 1 - 1
src/main/webapp/WEB-INF/jsp/admin-analysis.jsp

@@ -60,7 +60,7 @@
         <jsp:param name="pwm.PageName" value="<%=PageName%>"/>
     </jsp:include>
     <div id="centerbody" class="wide">
-        <div id="page-content-title"><pwm:display key="Title_DataAnalysis" bundle="Admin" displayIfMissing="true"/></div>
+        <div id="page-content-title"><pwm:display key="Title_DataAnalysis" bundle="Admin"/></div>
         <%@ include file="fragment/admin-nav.jsp" %>
         <div data-dojo-type="dijit.layout.TabContainer" style="width: 100%; height: 100%;"  data-dojo-props="doLayout: false, persist: true" id="analysis-topLevelTab">
             <div data-dojo-type="dijit.layout.TabContainer" style="width: 100%; height: 100%;" data-dojo-props="doLayout: false, persist: true" title="<pwm:display key="Title_DirectoryReporting" bundle="Admin"/>">

+ 2 - 2
src/main/webapp/WEB-INF/jsp/admin-dashboard.jsp

@@ -62,15 +62,15 @@
     }
 %>
 <html lang="<pwm:value name="<%=PwmValue.localeCode%>"/>" dir="<pwm:value name="<%=PwmValue.localeDir%>"/>">
+<% String PageName = JspUtility.localizedString(pageContext,"Title_Dashboard",Admin.class);%>
 <%@ include file="/WEB-INF/jsp/fragment/header.jsp" %>
 <body class="nihilo">
 <div id="wrapper">
-    <% String PageName = JspUtility.localizedString(pageContext,"Title_Dashboard",Admin.class);%>
     <jsp:include page="/WEB-INF/jsp/fragment/header-body.jsp">
         <jsp:param name="pwm.PageName" value="<%=PageName%>"/>
     </jsp:include>
     <div id="centerbody">
-        <div id="page-content-title"><pwm:display key="Title_Dashboard" bundle="Admin" displayIfMissing="true"/></div>
+        <div id="page-content-title"><pwm:display key="Title_Dashboard" bundle="Admin"/></div>
         <%@ include file="fragment/admin-nav.jsp" %>
         <div id="DashboardTabContainer" data-dojo-type="dijit.layout.TabContainer" style="width: 100%; height: 100%;" data-dojo-props="doLayout: false, persist: true">
             <div id="StatusTab" data-dojo-type="dijit.layout.ContentPane" title="Status" class="tabContent">

+ 3 - 3
src/main/webapp/WEB-INF/jsp/fragment/admin-nav.jsp

@@ -41,21 +41,21 @@
     <form action="<%=AdminServlet.Page.dashboard%>" method="get" id="dashboard" name="dashboard">
         <button type="submit" class="navbutton<%=selected?" selected":""%>">
             <pwm:if test="<%=PwmIfTest.showIcons%>"><span class="btn-icon pwm-icon pwm-icon-dashboard"></span></pwm:if>
-            Dashboard
+            <pwm:display key="Title_Dashboard" bundle="Admin"/>
         </button>
     </form>
     <% selected = currentPage == AdminServlet.Page.activity; %>
     <form action="<%=AdminServlet.Page.activity%>" method="get" id="activity" name="activity">
         <button type="submit" class="navbutton<%=selected?" selected":""%>">
             <pwm:if test="<%=PwmIfTest.showIcons%>"><span class="btn-icon pwm-icon pwm-icon-users"></span></pwm:if>
-            User Activity
+            <pwm:display key="Title_UserActivity" bundle="Admin"/>
         </button>
     </form>
     <% selected = currentPage == AdminServlet.Page.analysis; %>
     <form action="<%=AdminServlet.Page.analysis%>" method="get" id="analysis" name="analysis">
         <button type="submit" class="navbutton<%=selected?" selected":""%>">
             <pwm:if test="<%=PwmIfTest.showIcons%>"><span class="btn-icon pwm-icon pwm-icon-bar-chart-o"></span></pwm:if>
-            Data Analysis
+            <pwm:display key="Title_DataAnalysis" bundle="Admin"/>
         </button>
     </form>
     <div style="display: inline" id="admin-nav-menu-container">