Explorar el Código

Convert SmsQueue management to WorkQueueProcessor

Jason Rivard hace 9 años
padre
commit
4bf2f46fa8

+ 0 - 1
src/main/java/password/pwm/health/LDAPStatusChecker.java

@@ -233,7 +233,6 @@ public class LDAPStatusChecker implements HealthChecker {
             try {
             try {
                 final UserIdentity userIdentity = new UserIdentity(theUser.getEntryDN(),ldapProfile.getIdentifier());
                 final UserIdentity userIdentity = new UserIdentity(theUser.getEntryDN(),ldapProfile.getIdentifier());
                 final UserStatusReader.Settings readerSettings = new UserStatusReader.Settings();
                 final UserStatusReader.Settings readerSettings = new UserStatusReader.Settings();
-                readerSettings.setSkipReportUpdate(true);
                 final UserStatusReader userStatusReader = new UserStatusReader(
                 final UserStatusReader userStatusReader = new UserStatusReader(
                         pwmApplication,
                         pwmApplication,
                         PwmConstants.HEALTH_SESSION_LABEL,
                         PwmConstants.HEALTH_SESSION_LABEL,

+ 7 - 7
src/main/java/password/pwm/http/ContextManager.java

@@ -412,7 +412,7 @@ public class ContextManager implements Serializable {
                 return contextAppPathSetting;
                 return contextAppPathSetting;
             }
             }
 
 
-            final String contextPath = servletContext.getContextPath().replace("/","");
+            final String contextPath = servletContext.getContextPath().replace("/", "");
             return PwmEnvironment.ParseHelper.readValueFromSystem(
             return PwmEnvironment.ParseHelper.readValueFromSystem(
                     PwmEnvironment.EnvironmentParameter.applicationPath,
                     PwmEnvironment.EnvironmentParameter.applicationPath,
                     contextPath
                     contextPath
@@ -426,18 +426,18 @@ public class ContextManager implements Serializable {
                 return PwmEnvironment.ParseHelper.parseApplicationFlagValueParameter(contextAppFlagsValue);
                 return PwmEnvironment.ParseHelper.parseApplicationFlagValueParameter(contextAppFlagsValue);
             }
             }
 
 
-            final String contextPath = servletContext.getContextPath().replace("/","");
+            final String contextPath = servletContext.getContextPath().replace("/", "");
             return PwmEnvironment.ParseHelper.readApplicationFlagsFromSystem(contextPath);
             return PwmEnvironment.ParseHelper.readApplicationFlagsFromSystem(contextPath);
         }
         }
 
 
-        Map<PwmEnvironment.ApplicationParameter,String> readApplicationParams() {
+        Map<PwmEnvironment.ApplicationParameter, String> readApplicationParams() {
             final String contextAppParamsValue = readEnvironmentParameter(PwmEnvironment.EnvironmentParameter.applicationParamFile);
             final String contextAppParamsValue = readEnvironmentParameter(PwmEnvironment.EnvironmentParameter.applicationParamFile);
 
 
             if (contextAppParamsValue != null && !contextAppParamsValue.isEmpty()) {
             if (contextAppParamsValue != null && !contextAppParamsValue.isEmpty()) {
                 return PwmEnvironment.ParseHelper.parseApplicationParamValueParameter(contextAppParamsValue);
                 return PwmEnvironment.ParseHelper.parseApplicationParamValueParameter(contextAppParamsValue);
             }
             }
 
 
-            final String contextPath = servletContext.getContextPath().replace("/","");
+            final String contextPath = servletContext.getContextPath().replace("/", "");
             return PwmEnvironment.ParseHelper.readApplicationParmsFromSystem(contextPath);
             return PwmEnvironment.ParseHelper.readApplicationParmsFromSystem(contextPath);
         }
         }
 
 
@@ -451,12 +451,12 @@ public class ContextManager implements Serializable {
                     return value;
                     return value;
                 }
                 }
             }
             }
+            return null;
+        }
+    }
 
 
     public String getContextPath() {
     public String getContextPath() {
         return contextPath;
         return contextPath;
     }
     }
 
 
-            return null;
-        }
-    }
 }
 }

+ 0 - 22
src/main/java/password/pwm/ldap/UserStatusReader.java

@@ -43,7 +43,6 @@ import password.pwm.util.JsonUtil;
 import password.pwm.util.PasswordData;
 import password.pwm.util.PasswordData;
 import password.pwm.util.PwmPasswordRuleValidator;
 import password.pwm.util.PwmPasswordRuleValidator;
 import password.pwm.util.TimeDuration;
 import password.pwm.util.TimeDuration;
-import password.pwm.util.localdb.LocalDBException;
 import password.pwm.util.logging.PwmLogger;
 import password.pwm.util.logging.PwmLogger;
 import password.pwm.util.operations.CrService;
 import password.pwm.util.operations.CrService;
 import password.pwm.util.operations.OtpService;
 import password.pwm.util.operations.OtpService;
@@ -379,15 +378,6 @@ public class UserStatusReader {
             LOGGER.error(sessionLabel, "error reading account expired date for user '" + userIdentity + "', " + e.getMessage());
             LOGGER.error(sessionLabel, "error reading account expired date for user '" + userIdentity + "', " + e.getMessage());
         }
         }
 
 
-        // update report engine.
-        if (!settings.isSkipReportUpdate()) {
-            try {
-                pwmApplication.getReportService().updateCachedRecordFromLdap(uiBean);
-            } catch (LocalDBException e) {
-                LOGGER.error(sessionLabel, "error updating report cache data ldap login time: " + e.getMessage());
-            }
-        }
-
         LOGGER.trace(sessionLabel, "populateUserInfoBean for " + userIdentity + " completed in " + TimeDuration.fromCurrent(methodStartTime).asCompactString());
         LOGGER.trace(sessionLabel, "populateUserInfoBean for " + userIdentity + " completed in " + TimeDuration.fromCurrent(methodStartTime).asCompactString());
     }
     }
 
 
@@ -442,18 +432,6 @@ public class UserStatusReader {
     }
     }
 
 
     public static class Settings implements Serializable {
     public static class Settings implements Serializable {
-        private boolean skipReportUpdate;
-
-        public boolean isSkipReportUpdate()
-        {
-            return skipReportUpdate;
-        }
-
-        public void setSkipReportUpdate(boolean skipReportUpdate)
-        {
-            this.skipReportUpdate = skipReportUpdate;
-        }
-
         private Settings copy() {
         private Settings copy() {
             return JsonUtil.deserialize(JsonUtil.serialize(this),this.getClass());
             return JsonUtil.deserialize(JsonUtil.serialize(this),this.getClass());
         }
         }

+ 0 - 12
src/main/java/password/pwm/svc/report/ReportService.java

@@ -308,17 +308,6 @@ public class ReportService implements PwmService {
         }
         }
     }
     }
 
 
-    public boolean updateCachedRecordFromLdap(final UserInfoBean uiBean)
-            throws LocalDBException, PwmUnrecoverableException, ChaiUnavailableException
-    {
-        if (status != STATUS.OPEN) {
-            return false;
-        }
-
-        final UserCacheService.StorageKey storageKey = UserCacheService.StorageKey.fromUserInfoBean(uiBean);
-        return updateCachedRecordFromLdap(uiBean.getUserIdentity(), uiBean, storageKey);
-    }
-
     private boolean updateCachedRecordFromLdap(final UserIdentity userIdentity)
     private boolean updateCachedRecordFromLdap(final UserIdentity userIdentity)
             throws ChaiUnavailableException, PwmUnrecoverableException, LocalDBException
             throws ChaiUnavailableException, PwmUnrecoverableException, LocalDBException
     {
     {
@@ -369,7 +358,6 @@ public class ReportService implements PwmService {
             } else {
             } else {
                 newUserBean = new UserInfoBean();
                 newUserBean = new UserInfoBean();
                 final UserStatusReader.Settings readerSettings = new UserStatusReader.Settings();
                 final UserStatusReader.Settings readerSettings = new UserStatusReader.Settings();
-                readerSettings.setSkipReportUpdate(true);
                 final ChaiProvider chaiProvider = pwmApplication.getProxyChaiProvider(userIdentity.getLdapProfileID());
                 final ChaiProvider chaiProvider = pwmApplication.getProxyChaiProvider(userIdentity.getLdapProfileID());
                 final UserStatusReader userStatusReader = new UserStatusReader(pwmApplication,PwmConstants.REPORTING_SESSION_LABEL,readerSettings);
                 final UserStatusReader userStatusReader = new UserStatusReader(pwmApplication,PwmConstants.REPORTING_SESSION_LABEL,readerSettings);
                 userStatusReader.populateUserInfoBean(
                 userStatusReader.populateUserInfoBean(

+ 4 - 0
src/main/java/password/pwm/util/StringUtil.java

@@ -289,6 +289,10 @@ public abstract class StringUtil {
         return chunks.toArray(new String[numOfChunks]);
         return chunks.toArray(new String[numOfChunks]);
     }
     }
 
 
+    public static String mapToString(final Map map) {
+        return mapToString(map, "=", ",");
+    }
+
     public static String mapToString(final Map map, final String keyValueSeparator, final String recordSeparator) {
     public static String mapToString(final Map map, final String keyValueSeparator, final String recordSeparator) {
         final StringBuilder sb = new StringBuilder();
         final StringBuilder sb = new StringBuilder();
         for (final Iterator iterator = map.keySet().iterator(); iterator.hasNext(); ) {
         for (final Iterator iterator = map.keySet().iterator(); iterator.hasNext(); ) {

+ 9 - 6
src/main/java/password/pwm/util/WorkQueueProcessor.java

@@ -45,6 +45,9 @@ import java.util.concurrent.locks.LockSupport;
  */
  */
 public class WorkQueueProcessor<W extends Serializable> {
 public class WorkQueueProcessor<W extends Serializable> {
 
 
+    private static final TimeDuration SUBMIT_QUEUE_FULL_RETRY_CYCLE_INTERVAL = new TimeDuration(100, TimeUnit.MILLISECONDS);
+    private static final TimeDuration CLOSE_RETRY_CYCLE_INTERVAL = new TimeDuration(100, TimeUnit.MILLISECONDS);
+
     private final Deque<String> queue;
     private final Deque<String> queue;
     private final Settings settings;
     private final Settings settings;
     private final ItemProcessor<W> itemProcessor;
     private final ItemProcessor<W> itemProcessor;
@@ -112,7 +115,7 @@ public class WorkQueueProcessor<W extends Serializable> {
             LOGGER.debug("attempting to flush queue prior to shutdown, items in queue=" + queueSize());
             LOGGER.debug("attempting to flush queue prior to shutdown, items in queue=" + queueSize());
         }
         }
         while (localWorkerThread.isRunning() && TimeDuration.fromCurrent(shutdownStartTime).isLongerThan(settings.getMaxShutdownWaitTime())) {
         while (localWorkerThread.isRunning() && TimeDuration.fromCurrent(shutdownStartTime).isLongerThan(settings.getMaxShutdownWaitTime())) {
-            Helper.pause(100);
+            Helper.pause(CLOSE_RETRY_CYCLE_INTERVAL.getTotalMilliseconds());
         }
         }
 
 
         if (!queue.isEmpty()) {
         if (!queue.isEmpty()) {
@@ -137,7 +140,7 @@ public class WorkQueueProcessor<W extends Serializable> {
                             + ", item=" + itemProcessor.convertToDebugString(workItem);
                             + ", item=" + itemProcessor.convertToDebugString(workItem);
                     throw new PwmOperationalException(new ErrorInformation(PwmError.ERROR_UNKNOWN, errorMsg));
                     throw new PwmOperationalException(new ErrorInformation(PwmError.ERROR_UNKNOWN, errorMsg));
                 }
                 }
-                Helper.pause(100);
+                Helper.pause(SUBMIT_QUEUE_FULL_RETRY_CYCLE_INTERVAL.getTotalMilliseconds());
             }
             }
 
 
             eldestItem = itemWrapper.getDate();
             eldestItem = itemWrapper.getDate();
@@ -250,7 +253,7 @@ public class WorkQueueProcessor<W extends Serializable> {
                     removeQueueTop();
                     removeQueueTop();
                     return;
                     return;
                 }
                 }
-            } catch (Exception e) {
+            } catch (Throwable e) {
                 LOGGER.error("error reading queued item: " + e.getMessage(), e);
                 LOGGER.error("error reading queued item: " + e.getMessage(), e);
                 removeQueueTop();
                 removeQueueTop();
                 return;
                 return;
@@ -286,8 +289,8 @@ public class WorkQueueProcessor<W extends Serializable> {
                             throw new IllegalStateException("unexpected processResult type " + processResult);
                             throw new IllegalStateException("unexpected processResult type " + processResult);
                     }
                     }
                 }
                 }
-            } catch(PwmOperationalException e){
-                LOGGER.error("unexpected error while processing work queue: " + e.getErrorInformation());
+            } catch(Throwable e) {
+                LOGGER.error("unexpected error while processing work queue: " + e.getMessage());
                 removeQueueTop();
                 removeQueueTop();
             }
             }
 
 
@@ -351,7 +354,7 @@ public class WorkQueueProcessor<W extends Serializable> {
 
 
     public static class Settings implements Serializable {
     public static class Settings implements Serializable {
         private int maxEvents = 1000;
         private int maxEvents = 1000;
-        private TimeDuration maxSubmitWaitTime = new TimeDuration(20, TimeUnit.SECONDS);
+        private TimeDuration maxSubmitWaitTime = new TimeDuration(5, TimeUnit.SECONDS);
         private TimeDuration retryInterval = new TimeDuration(30, TimeUnit.SECONDS);
         private TimeDuration retryInterval = new TimeDuration(30, TimeUnit.SECONDS);
         private TimeDuration retryDiscardAge = new TimeDuration(1, TimeUnit.HOURS);
         private TimeDuration retryDiscardAge = new TimeDuration(1, TimeUnit.HOURS);
         private TimeDuration maxShutdownWaitTime = new TimeDuration(30, TimeUnit.SECONDS);
         private TimeDuration maxShutdownWaitTime = new TimeDuration(30, TimeUnit.SECONDS);

+ 0 - 384
src/main/java/password/pwm/util/queue/AbstractQueueManager.java

@@ -1,384 +0,0 @@
-/*
- * Password Management Servlets (PWM)
- * http://www.pwm-project.org
- *
- * Copyright (c) 2006-2009 Novell, Inc.
- * Copyright (c) 2009-2016 The PWM Project
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
- */
-
-package password.pwm.util.queue;
-
-import password.pwm.AppProperty;
-import password.pwm.PwmApplication;
-import password.pwm.PwmApplicationMode;
-import password.pwm.config.option.DataStorageMethod;
-import password.pwm.error.*;
-import password.pwm.health.HealthMessage;
-import password.pwm.health.HealthRecord;
-import password.pwm.svc.PwmService;
-import password.pwm.util.Helper;
-import password.pwm.util.JsonUtil;
-import password.pwm.util.TimeDuration;
-import password.pwm.util.localdb.LocalDB;
-import password.pwm.util.localdb.LocalDBStoredQueue;
-import password.pwm.util.logging.PwmLogger;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.*;
-
-public abstract class AbstractQueueManager implements PwmService {
-    protected PwmLogger LOGGER = PwmLogger.forClass(AbstractQueueManager.class);
-
-    private static final long QUEUE_POLL_INTERVAL = 30 * 1003;
-
-    protected PwmApplication pwmApplication;
-    protected STATUS status = PwmService.STATUS.NEW;
-    protected Timer timerThread;
-    protected Settings settings;
-
-    protected Date lastSendTime = new Date();
-    private LocalDBStoredQueue sendQueue;
-    protected int itemIDCounter;
-    protected PwmApplication.AppAttribute itemCountAppAttribute;
-    protected String serviceName = AbstractQueueManager.class.getSimpleName();
-
-    protected FailureInfo lastFailure;
-
-    static class FailureInfo {
-        private Date time = new Date();
-        private ErrorInformation errorInformation;
-        private QueueEvent queueEvent;
-
-        public FailureInfo(ErrorInformation errorInformation, QueueEvent queueEvent) {
-            this.errorInformation = errorInformation;
-            this.queueEvent = queueEvent;
-        }
-
-        public Date getTime() {
-            return time;
-        }
-
-        public ErrorInformation getErrorInformation() {
-            return errorInformation;
-        }
-
-        public QueueEvent getQueueEvent() {
-            return queueEvent;
-        }
-    }
-
-
-    public STATUS status() {
-        return status;
-    }
-
-    public int queueSize() {
-        if (sendQueue == null || status != STATUS.OPEN) {
-            return 0;
-        }
-
-        return this.sendQueue.size();
-    }
-    
-    public Date eldestItem() {
-        if (status != STATUS.OPEN) {
-            return null;
-        }
-        final String jsonEvent = sendQueue.peekFirst();
-        if (jsonEvent != null) {
-            final QueueEvent event = JsonUtil.deserialize(jsonEvent, QueueEvent.class);
-            if (event != null) {
-                return event.getTimestamp();
-            }
-        }
-        return null;
-    }
-
-    protected void add(final Serializable input)
-            throws PwmUnrecoverableException
-    {
-        final String jsonInput = JsonUtil.serialize(input);
-        final int nextItemID = getNextItemCount();
-        final QueueEvent event = new QueueEvent(jsonInput, new Date(), nextItemID);
-
-        if (status != PwmService.STATUS.OPEN) {
-            throw new PwmUnrecoverableException(new ErrorInformation(PwmError.ERROR_CLOSING));
-        }
-
-        if (sendQueue.size() >= settings.getMaxQueueItemCount()) {
-            LOGGER.warn("queue full, discarding item send request: " + event.getItem());
-            return;
-        }
-
-        final String jsonEvent = JsonUtil.serialize(event);
-        sendQueue.addLast(jsonEvent);
-        LOGGER.trace("submitted item to queue: " + queueItemToDebugString(event) + ", queue size: " + sendQueue.size());
-
-        timerThread.schedule(new QueueProcessorTask(), 1);
-    }
-
-    protected static class QueueEvent implements Serializable {
-        private String item;
-        private Date timestamp;
-        private int itemID;
-
-        protected QueueEvent(
-                final String item,
-                final Date timestamp,
-                final int itemID
-        ) {
-            this.item = item;
-            this.timestamp = timestamp;
-            this.itemID = itemID;
-        }
-
-        public String getItem() {
-            return item;
-        }
-
-        public Date getTimestamp() {
-            return timestamp;
-        }
-
-        public int getItemID()
-        {
-            return itemID;
-        }
-    }
-
-    boolean sendIsRetryable(final Exception e) {
-        if (e != null) {
-            final Throwable cause = e.getCause();
-            if (cause instanceof IOException) {
-                LOGGER.trace("message send failure cause is due to an IOException: " + e.getMessage());
-                return true;
-            }
-        }
-        return false;
-    }
-
-    public void init(
-            final PwmApplication pwmApplication,
-            final LocalDB.DB DB,
-            final Settings settings,
-            final PwmApplication.AppAttribute itemCountAppAttribute,
-            final String serviceName
-    )
-            throws PwmException
-    {
-        this.serviceName = serviceName;
-        this.pwmApplication = pwmApplication;
-        this.itemCountAppAttribute = itemCountAppAttribute;
-        this.settings = settings;
-
-        final LocalDB localDB = this.pwmApplication.getLocalDB();
-
-        if (localDB == null || localDB.status() != LocalDB.Status.OPEN) {
-            status = STATUS.CLOSED;
-            return;
-        }
-
-        if (pwmApplication.getApplicationMode() == PwmApplicationMode.READ_ONLY) {
-            status = STATUS.CLOSED;
-            return;
-        }
-
-        itemIDCounter = readItemIDCounter();
-        sendQueue = LocalDBStoredQueue.createLocalDBStoredQueue(pwmApplication, localDB, DB);
-        final String threadName = Helper.makeThreadName(pwmApplication, this.getClass()) + " timer thread";
-        timerThread = new Timer(threadName,true);
-        status = PwmService.STATUS.OPEN;
-        LOGGER.debug(settings.getDebugName() + " is now open, " + sendQueue.size() + " items in queue");
-        timerThread.schedule(new QueueProcessorTask(),1,QUEUE_POLL_INTERVAL);
-    }
-
-    protected int readItemIDCounter() {
-        final String itemCountStr = pwmApplication.readAppAttribute(itemCountAppAttribute, String.class);
-        if (itemCountStr != null) {
-            try {
-                return Integer.parseInt(itemCountStr);
-            } catch (Exception e) {
-                LOGGER.error("error reading stored item counter app attribute: " + e.getMessage());
-            }
-        }
-        return 0;
-    }
-
-    protected void storeItemCounter() {
-        try {
-            pwmApplication.writeAppAttribute(itemCountAppAttribute, String.valueOf(itemIDCounter));
-        } catch (Exception e) {
-            LOGGER.error("error writing stored item counter app attribute: " + e.getMessage());
-        }
-    }
-
-    protected synchronized int getNextItemCount() {
-        itemIDCounter++;
-        if (itemIDCounter < 0) {
-            itemIDCounter = 0;
-        }
-        storeItemCounter();
-        return itemIDCounter;
-    }
-
-    public synchronized void close() {
-        status = PwmService.STATUS.CLOSED;
-        final Date startTime = new Date();
-        final int maxCloseWaitMs = Integer.parseInt(pwmApplication.getConfig().readAppProperty(AppProperty.QUEUE_MAX_CLOSE_TIMEOUT_MS));
-
-        if (sendQueue != null && !sendQueue.isEmpty()) {
-            if (timerThread != null) {
-                timerThread.schedule(new QueueProcessorTask(),1);
-                LOGGER.warn("waiting up to 5 seconds for " + sendQueue.size() + " items in the queue to process");
-                while (!sendQueue.isEmpty() && TimeDuration.fromCurrent(startTime).isShorterThan(maxCloseWaitMs)) {
-                    Helper.pause(100);
-                }
-            }
-            if (!sendQueue.isEmpty()) {
-                LOGGER.warn("closing queue with " + sendQueue.size() + " message in queue");
-            }
-        }
-
-        if (timerThread != null) {
-            timerThread.cancel();
-        }
-        timerThread = null;
-    }
-
-    public List<HealthRecord> healthCheck() {
-        if (pwmApplication.getLocalDB() == null || pwmApplication.getLocalDB().status() != LocalDB.Status.OPEN) {
-            return Collections.singletonList(HealthRecord.forMessage(HealthMessage.ServiceClosed_LocalDBUnavail, serviceName));
-        }
-
-        if (pwmApplication.getApplicationMode() == PwmApplicationMode.READ_ONLY) {
-            return Collections.singletonList(HealthRecord.forMessage(HealthMessage.ServiceClosed_AppReadOnly,serviceName));
-        }
-
-        if (lastFailure != null) {
-            return this.failureToHealthRecord(lastFailure);
-        }
-
-        return Collections.emptyList();
-    }
-
-    private void processQueue() {
-        if (lastFailure != null) {
-            final TimeDuration timeSinceFailure = TimeDuration.fromCurrent(lastSendTime);
-            if (timeSinceFailure.isShorterThan(settings.getErrorRetryWaitTime())) {
-                return;
-            }
-        }
-
-        lastSendTime = new Date();
-
-        boolean sendFailure = false;
-        while (sendQueue.peekFirst() != null && !sendFailure) {
-            final String jsonEvent = sendQueue.peekFirst();
-            if (jsonEvent != null) {
-                final QueueEvent event = JsonUtil.deserialize(jsonEvent, QueueEvent.class);
-
-                if (event == null || event.getTimestamp() == null) {
-                    sendQueue.pollFirst();
-                } else if (TimeDuration.fromCurrent(event.getTimestamp()).isLongerThan(
-                        settings.getMaxQueueItemAge())) {
-                    LOGGER.debug("discarding event due to maximum retry age: " + queueItemToDebugString(event));
-                    sendQueue.pollFirst();
-                    noteDiscardedItem(event);
-                } else {
-                    final String item = event.getItem();
-
-                    LOGGER.trace("preparing to send item in queue: " + queueItemToDebugString(
-                            event) + ", queue size: " + sendQueue.size());
-
-                    // execute operation
-                    try {
-                        sendItem(item);
-                        sendQueue.pollFirst();
-                        LOGGER.trace("queued item processed and removed from queue: " + queueItemToDebugString(event) + ", queue size: " + sendQueue.size());
-                        lastFailure = null;
-                    } catch (PwmOperationalException e) {
-                        sendFailure = true;
-                        lastFailure = new FailureInfo(e.getErrorInformation(),event);
-                        LOGGER.debug("queued item was not successfully processed, will retry: " + queueItemToDebugString(event) + ", queue size: " + sendQueue.size());
-                    }
-                }
-            }
-        }
-    }
-
-
-    abstract void sendItem(String item) throws PwmOperationalException;
-
-    abstract List<HealthRecord> failureToHealthRecord(FailureInfo failureInfo);
-
-
-    // -------------------------- INNER CLASSES --------------------------
-
-    protected class QueueProcessorTask extends TimerTask {
-        public void run() {
-            try {
-                processQueue();
-            } catch (Exception e) {
-                LOGGER.error("unexpected exception while processing " + settings.getDebugName() + " queue: " + e.getMessage(), e);
-            }
-        }
-    }
-
-    protected static class Settings {
-        private TimeDuration maxQueueItemAge;
-        private TimeDuration errorRetryWaitTime;
-        private int maxQueueItemCount;
-        private String debugName;
-
-        public Settings(TimeDuration maxQueueItemAge, TimeDuration errorRetryWaitTime, int maxQueueItemCount, String debugName) {
-            this.maxQueueItemAge = maxQueueItemAge;
-            this.errorRetryWaitTime = errorRetryWaitTime;
-            this.maxQueueItemCount = maxQueueItemCount;
-            this.debugName = debugName;
-        }
-
-        public TimeDuration getMaxQueueItemAge() {
-            return maxQueueItemAge;
-        }
-
-        public TimeDuration getErrorRetryWaitTime() {
-            return errorRetryWaitTime;
-        }
-
-        public int getMaxQueueItemCount() {
-            return maxQueueItemCount;
-        }
-
-        public String getDebugName() {
-            return debugName;
-        }
-    }
-
-    public ServiceInfo serviceInfo()
-    {
-        if (status() == STATUS.OPEN) {
-            return new ServiceInfo(Collections.singletonList(DataStorageMethod.LOCALDB));
-        } else {
-            return new ServiceInfo(Collections.<DataStorageMethod>emptyList());
-        }
-    }
-
-    protected abstract String queueItemToDebugString(QueueEvent queueEvent);
-
-    protected abstract void noteDiscardedItem(QueueEvent queueEvent);
-}

+ 92 - 53
src/main/java/password/pwm/util/queue/SmsQueueManager.java

@@ -39,10 +39,12 @@ import password.pwm.error.*;
 import password.pwm.health.HealthMessage;
 import password.pwm.health.HealthMessage;
 import password.pwm.health.HealthRecord;
 import password.pwm.health.HealthRecord;
 import password.pwm.http.client.PwmHttpClient;
 import password.pwm.http.client.PwmHttpClient;
+import password.pwm.svc.PwmService;
 import password.pwm.svc.stats.Statistic;
 import password.pwm.svc.stats.Statistic;
 import password.pwm.svc.stats.StatisticsManager;
 import password.pwm.svc.stats.StatisticsManager;
 import password.pwm.util.*;
 import password.pwm.util.*;
 import password.pwm.util.localdb.LocalDB;
 import password.pwm.util.localdb.LocalDB;
+import password.pwm.util.localdb.LocalDBStoredQueue;
 import password.pwm.util.logging.PwmLogger;
 import password.pwm.util.logging.PwmLogger;
 import password.pwm.util.secure.PwmRandom;
 import password.pwm.util.secure.PwmRandom;
 
 
@@ -54,7 +56,7 @@ import java.util.regex.Pattern;
 /**
 /**
  * @author Menno Pieters, Jason D. Rivard
  * @author Menno Pieters, Jason D. Rivard
  */
  */
-public class SmsQueueManager extends AbstractQueueManager {
+public class SmsQueueManager implements PwmService {
     private static final PwmLogger LOGGER = PwmLogger.forClass(SmsQueueManager.class);
     private static final PwmLogger LOGGER = PwmLogger.forClass(SmsQueueManager.class);
 
 
 // ------------------------------ FIELDS ------------------------------
 // ------------------------------ FIELDS ------------------------------
@@ -85,38 +87,73 @@ public class SmsQueueManager extends AbstractQueueManager {
 
 
     private SmsSendEngine smsSendEngine;
     private SmsSendEngine smsSendEngine;
 
 
-// --------------------------- CONSTRUCTORS ---------------------------
+    private WorkQueueProcessor<SmsItemBean> workQueueProcessor;
+    private PwmApplication pwmApplication;
+    private STATUS status = STATUS.NEW;
+    private ErrorInformation lastError;
 
 
     public SmsQueueManager() {
     public SmsQueueManager() {
     }
     }
-// ------------------------ INTERFACE METHODS ------------------------
-
-// --------------------- Interface PwmService ---------------------
 
 
     public void init(
     public void init(
             final PwmApplication pwmApplication
             final PwmApplication pwmApplication
     )
     )
             throws PwmException
             throws PwmException
     {
     {
-        super.LOGGER = PwmLogger.forClass(SmsQueueManager.class);
-        final Settings settings = new Settings(
-                new TimeDuration(Long.parseLong(pwmApplication.getConfig().readAppProperty(AppProperty.QUEUE_SMS_MAX_AGE_MS))),
-                new TimeDuration(Long.parseLong(pwmApplication.getConfig().readAppProperty(AppProperty.QUEUE_SMS_RETRY_TIMEOUT_MS))),
-                Integer.parseInt(pwmApplication.getConfig().readAppProperty(AppProperty.QUEUE_SMS_MAX_COUNT)),
-                EmailQueueManager.class.getSimpleName()
-        );
-        super.init(
-                pwmApplication,
-                LocalDB.DB.SMS_QUEUE,
-                settings,
-                PwmApplication.AppAttribute.SMS_ITEM_COUNTER,
-                SmsQueueManager.class.getSimpleName()
-        );
+        status = STATUS.OPENING;
+        this.pwmApplication = pwmApplication;
+        if (pwmApplication.getLocalDB() == null || pwmApplication.getLocalDB().status() != LocalDB.Status.OPEN) {
+            LOGGER.warn("localdb is not open,  will remain closed");
+            status = STATUS.CLOSED;
+            return;
+        }
+
+        final WorkQueueProcessor.Settings settings = new WorkQueueProcessor.Settings();
+        settings.setMaxEvents(Integer.parseInt(pwmApplication.getConfig().readAppProperty(AppProperty.QUEUE_SMS_MAX_COUNT)));
+        settings.setRetryDiscardAge(new TimeDuration(Long.parseLong(pwmApplication.getConfig().readAppProperty(AppProperty.QUEUE_SMS_MAX_AGE_MS))));
+        settings.setRetryInterval(new TimeDuration(Long.parseLong(pwmApplication.getConfig().readAppProperty(AppProperty.QUEUE_SMS_RETRY_TIMEOUT_MS))));
+        final LocalDBStoredQueue localDBStoredQueue = LocalDBStoredQueue.createLocalDBStoredQueue(pwmApplication, pwmApplication.getLocalDB(), LocalDB.DB.SMS_QUEUE);
+
+        workQueueProcessor = new WorkQueueProcessor<>(pwmApplication, localDBStoredQueue, settings, new SmsItemProcessor(), this.getClass());
+
         smsSendEngine = new SmsSendEngine(pwmApplication.getConfig());
         smsSendEngine = new SmsSendEngine(pwmApplication.getConfig());
+
+        status = STATUS.OPEN;
     }
     }
 
 
+    private class SmsItemProcessor implements WorkQueueProcessor.ItemProcessor<SmsItemBean> {
+        @Override
+        public WorkQueueProcessor.ProcessResult process(SmsItemBean workItem) {
+            try {
+                for (final String msgPart : splitMessage(workItem.getMessage())) {
+                    smsSendEngine.sendSms(workItem.getTo(), msgPart);
+                }
+                StatisticsManager.incrementStat(pwmApplication, Statistic.SMS_SEND_SUCCESSES);
+                lastError = null;
+            } catch (PwmUnrecoverableException e) {
+                StatisticsManager.incrementStat(pwmApplication, Statistic.SMS_SEND_DISCARDS);
+                StatisticsManager.incrementStat(pwmApplication, Statistic.SMS_SEND_FAILURES);
+                LOGGER.error("discarding sms message due to permanent failure: " + e.getErrorInformation().toDebugStr());
+                lastError = e.getErrorInformation();
+                return WorkQueueProcessor.ProcessResult.FAILED;
+            } catch (PwmOperationalException e) {
+                StatisticsManager.incrementStat(pwmApplication, Statistic.SMS_SEND_FAILURES);
+                lastError = e.getErrorInformation();
+                return WorkQueueProcessor.ProcessResult.RETRY;
+            }
+
+            return WorkQueueProcessor.ProcessResult.SUCCESS;
+        }
+
+        @Override
+        public String convertToDebugString(SmsItemBean workItem) {
+            final Map<String,Object> debugOutputMap = new LinkedHashMap<>();
 
 
-// -------------------------- OTHER METHODS --------------------------
+            debugOutputMap.put("to", workItem.getTo());
+
+            return JsonUtil.serializeMap(debugOutputMap);
+        }
+    }
 
 
     public void addSmsToQueue(final SmsItemBean smsItem)
     public void addSmsToQueue(final SmsItemBean smsItem)
             throws PwmUnrecoverableException
             throws PwmUnrecoverableException
@@ -127,7 +164,7 @@ public class SmsQueueManager extends AbstractQueueManager {
         }
         }
 
 
         try {
         try {
-            add(smsItem);
+            workQueueProcessor.submit(smsItem);
         } catch (Exception e) {
         } catch (Exception e) {
             LOGGER.error("error writing to LocalDB queue, discarding sms send request: " + e.getMessage());
             LOGGER.error("error writing to LocalDB queue, discarding sms send request: " + e.getMessage());
         }
         }
@@ -158,7 +195,7 @@ public class SmsQueueManager extends AbstractQueueManager {
         return true;
         return true;
     }
     }
 
 
-    protected boolean determineIfItemCanBeDelivered(final SmsItemBean smsItem) {
+    boolean determineIfItemCanBeDelivered(final SmsItemBean smsItem) {
         final Configuration config = pwmApplication.getConfig();
         final Configuration config = pwmApplication.getConfig();
         if (!smsIsConfigured(config)) {
         if (!smsIsConfigured(config)) {
             return false;
             return false;
@@ -177,24 +214,35 @@ public class SmsQueueManager extends AbstractQueueManager {
         return true;
         return true;
     }
     }
 
 
-    void sendItem(final String item) throws PwmOperationalException {
-        final SmsItemBean smsItemBean = JsonUtil.deserialize(item, SmsItemBean.class);
-        try {
-            for (final String msgPart : splitMessage(smsItemBean.getMessage())) {
-                smsSendEngine.sendSms(smsItemBean.getTo(), msgPart);
-            }
-            StatisticsManager.incrementStat(pwmApplication, Statistic.SMS_SEND_SUCCESSES);
-        } catch (PwmUnrecoverableException e) {
-            StatisticsManager.incrementStat(pwmApplication, Statistic.SMS_SEND_FAILURES);
-            LOGGER.error("discarding sms message due to permanent failure: " + e.getErrorInformation().toDebugStr());
+    @Override
+    public STATUS status() {
+        return status;
+    }
+
+    @Override
+    public void close() {
+        if (workQueueProcessor != null) {
+            workQueueProcessor.close();
         }
         }
+        workQueueProcessor = null;
+
+        status = STATUS.CLOSED;
     }
     }
 
 
     @Override
     @Override
-    List<HealthRecord> failureToHealthRecord(FailureInfo failureInfo) {
-        return Collections.singletonList(HealthRecord.forMessage(HealthMessage.SMS_SendFailure, failureInfo.getErrorInformation().toDebugStr()));
+    public List<HealthRecord> healthCheck() {
+        if (lastError != null) {
+            return Collections.singletonList(HealthRecord.forMessage(HealthMessage.SMS_SendFailure, lastError.toDebugStr()));
+        }
+        return null;
     }
     }
 
 
+    @Override
+    public ServiceInfo serviceInfo() {
+        return null;
+    }
+
+
     private List<String> splitMessage(final String input) {
     private List<String> splitMessage(final String input) {
         final int size = (int)pwmApplication.getConfig().readSettingAsLong(PwmSetting.SMS_MAX_TEXT_LENGTH);
         final int size = (int)pwmApplication.getConfig().readSettingAsLong(PwmSetting.SMS_MAX_TEXT_LENGTH);
 
 
@@ -281,7 +329,7 @@ public class SmsQueueManager extends AbstractQueueManager {
         ));
         ));
     }
     }
 
 
-    protected static String formatSmsNumber(final Configuration config, final String smsNumber) {
+    static String formatSmsNumber(final Configuration config, final String smsNumber) {
         final long ccLong = config.readSettingAsLong(PwmSetting.SMS_DEFAULT_COUNTRY_CODE);
         final long ccLong = config.readSettingAsLong(PwmSetting.SMS_DEFAULT_COUNTRY_CODE);
         String countryCodeNumber = "";
         String countryCodeNumber = "";
         if (ccLong > 0) {
         if (ccLong > 0) {
@@ -336,24 +384,7 @@ public class SmsQueueManager extends AbstractQueueManager {
         return returnValue;
         return returnValue;
     }
     }
 
 
-    @Override
-    protected String queueItemToDebugString(QueueEvent queueEvent)
-    {
-        final Map<String,Object> debugOutputMap = new LinkedHashMap<>();
-        debugOutputMap.put("itemID", queueEvent.getItemID());
-        debugOutputMap.put("timestamp", queueEvent.getTimestamp());
-        final SmsItemBean smsItemBean = JsonUtil.deserialize(queueEvent.getItem(), SmsItemBean.class);
 
 
-        debugOutputMap.put("to", smsItemBean.getTo());
-
-        return JsonUtil.serializeMap(debugOutputMap);
-    }
-
-    @Override
-    protected void noteDiscardedItem(QueueEvent queueEvent)
-    {
-        StatisticsManager.incrementStat(pwmApplication, Statistic.SMS_SEND_DISCARDS);
-    }
 
 
     private static class SmsSendEngine {
     private static class SmsSendEngine {
         private static final PwmLogger LOGGER = PwmLogger.forClass(SmsSendEngine.class);
         private static final PwmLogger LOGGER = PwmLogger.forClass(SmsSendEngine.class);
@@ -377,7 +408,6 @@ public class SmsQueueManager extends AbstractQueueManager {
                 throws PwmUnrecoverableException, PwmOperationalException
                 throws PwmUnrecoverableException, PwmOperationalException
         {
         {
             lastResponseBody = null;
             lastResponseBody = null;
-            final long startTime = System.currentTimeMillis();
 
 
             final String gatewayUser = config.readSettingAsString(PwmSetting.SMS_GATEWAY_USER);
             final String gatewayUser = config.readSettingAsString(PwmSetting.SMS_GATEWAY_USER);
             final PasswordData gatewayPass = config.readSettingAsPassword(PwmSetting.SMS_GATEWAY_PASSWORD);
             final PasswordData gatewayPass = config.readSettingAsPassword(PwmSetting.SMS_GATEWAY_PASSWORD);
@@ -494,4 +524,13 @@ public class SmsQueueManager extends AbstractQueueManager {
         smsSendEngine.sendSms(smsItemBean.getTo(), smsItemBean.getMessage());
         smsSendEngine.sendSms(smsItemBean.getTo(), smsItemBean.getMessage());
         return smsSendEngine.getLastResponseBody();
         return smsSendEngine.getLastResponseBody();
     }
     }
+
+    public int queueSize() {
+        return workQueueProcessor.queueSize();
+    }
+
+    public Date eldestItem() {
+        return workQueueProcessor.eldestItem();
+    }
+
 }
 }

+ 23 - 23
src/main/webapp/WEB-INF/jsp/admin-analysis.jsp

@@ -65,6 +65,29 @@
         <div data-dojo-type="dijit.layout.TabContainer" style="width: 100%; height: 100%;"  data-dojo-props="doLayout: false, persist: true" id="analysis-topLevelTab">
         <div data-dojo-type="dijit.layout.TabContainer" style="width: 100%; height: 100%;"  data-dojo-props="doLayout: false, persist: true" id="analysis-topLevelTab">
             <div data-dojo-type="dijit.layout.TabContainer" style="width: 100%; height: 100%;" data-dojo-props="doLayout: false, persist: true" title="<pwm:display key="Title_DirectoryReporting" bundle="Admin"/>">
             <div data-dojo-type="dijit.layout.TabContainer" style="width: 100%; height: 100%;" data-dojo-props="doLayout: false, persist: true" title="<pwm:display key="Title_DirectoryReporting" bundle="Admin"/>">
                 <% if (analysis_pwmRequest.getConfig().readSettingAsBoolean(PwmSetting.REPORTING_ENABLE)) { %>
                 <% if (analysis_pwmRequest.getConfig().readSettingAsBoolean(PwmSetting.REPORTING_ENABLE)) { %>
+                <div data-dojo-type="dijit.layout.ContentPane" title="<pwm:display key="Title_ReportEngineStatus" bundle="Admin"/>" class="tabContent">
+                    <table style="width:450px" id="statusTable">
+                        <tr><td><pwm:display key="Display_PleaseWait"/></td></tr>
+                    </table>
+                    <table style="width:450px;">
+                        <tr><td style="text-align: center; cursor: pointer">
+                            <button id="reportStartButton" class="btn">
+                                <pwm:if test="<%=PwmIfTest.showIcons%>"><span class="btn-icon pwm-icon pwm-icon-play">&nbsp;</span></pwm:if>
+                                <pwm:display key="Button_Report_Start" bundle="Admin"/>
+                            </button>
+                            &nbsp;&nbsp;
+                            <button id="reportStopButton" class="btn">
+                                <pwm:if test="<%=PwmIfTest.showIcons%>"><span class="btn-icon pwm-icon pwm-icon-stop">&nbsp;</span></pwm:if>
+                                <pwm:display key="Button_Report_Stop" bundle="Admin"/>
+                            </button>
+                            &nbsp;&nbsp;
+                            <button id="reportClearButton" class="btn">
+                                <pwm:if test="<%=PwmIfTest.showIcons%>"><span class="btn-icon pwm-icon pwm-icon-trash-o">&nbsp;</span></pwm:if>
+                                <pwm:display key="Button_Report_Clear" bundle="Admin"/>
+                            </button>
+                        </td></tr>
+                    </table>
+                </div>
                 <div data-dojo-type="dijit.layout.ContentPane" title="Summary" class="tabContent">
                 <div data-dojo-type="dijit.layout.ContentPane" title="Summary" class="tabContent">
                     <div style="max-height: 500px; overflow-y: auto" id="summaryTableWrapper">
                     <div style="max-height: 500px; overflow-y: auto" id="summaryTableWrapper">
                         <table id="summaryTable">
                         <table id="summaryTable">
@@ -127,29 +150,6 @@
                         </script>
                         </script>
                     </pwm:script>
                     </pwm:script>
                 </div>
                 </div>
-                <div data-dojo-type="dijit.layout.ContentPane" title="<pwm:display key="Title_ReportEngineStatus" bundle="Admin"/>" class="tabContent">
-                    <table style="width:450px" id="statusTable">
-                        <tr><td><pwm:display key="Display_PleaseWait"/></td></tr>
-                    </table>
-                    <table style="width:450px;">
-                        <tr><td style="text-align: center; cursor: pointer">
-                            <button id="reportStartButton" class="btn">
-                                <pwm:if test="<%=PwmIfTest.showIcons%>"><span class="btn-icon pwm-icon pwm-icon-play">&nbsp;</span></pwm:if>
-                                <pwm:display key="Button_Report_Start" bundle="Admin"/>
-                            </button>
-                            &nbsp;&nbsp;
-                            <button id="reportStopButton" class="btn">
-                                <pwm:if test="<%=PwmIfTest.showIcons%>"><span class="btn-icon pwm-icon pwm-icon-stop">&nbsp;</span></pwm:if>
-                                <pwm:display key="Button_Report_Stop" bundle="Admin"/>
-                            </button>
-                            &nbsp;&nbsp;
-                            <button id="reportClearButton" class="btn">
-                                <pwm:if test="<%=PwmIfTest.showIcons%>"><span class="btn-icon pwm-icon pwm-icon-trash-o">&nbsp;</span></pwm:if>
-                                <pwm:display key="Button_Report_Clear" bundle="Admin"/>
-                            </button>
-                        </td></tr>
-                    </table>
-                </div>
                 <% } else { %>
                 <% } else { %>
                 <div>
                 <div>
                     <%= PwmError.ERROR_SERVICE_NOT_AVAILABLE.getLocalizedMessage(analysis_pwmRequest.getLocale(),
                     <%= PwmError.ERROR_SERVICE_NOT_AVAILABLE.getLocalizedMessage(analysis_pwmRequest.getLocale(),