浏览代码

refactor emailqueue service to use workqueueprocessor

Jason Rivard 9 年之前
父节点
当前提交
7a6814d8d7

+ 1 - 0
src/main/java/password/pwm/error/PwmError.java

@@ -161,6 +161,7 @@ public enum PwmError {
     ERROR_STARTUP_ERROR(            5082, "Error_StartupError",             null, ErrorFlag.Permanent),
     ERROR_ENVIRONMENT_ERROR(        5083, "Error_EnvironmentError",         null, ErrorFlag.Permanent),
     ERROR_APPLICATION_NOT_RUNNING(  5084, "Error_ApplicationNotRunning",    null, ErrorFlag.Permanent),
+    ERROR_EMAIL_SEND_FAILURE(       5085, "Error_EmailSendFailure",         null, ErrorFlag.Permanent),
 
     ERROR_REMOTE_ERROR_VALUE(       6000, "Error_RemoteErrorValue",         null, ErrorFlag.Permanent),
 

+ 6 - 1
src/main/java/password/pwm/http/filter/RequestInitializationFilter.java

@@ -164,7 +164,12 @@ public class RequestInitializationFilter implements Filter {
             }
 
         } catch (Throwable e) {
-            LOGGER.error("can't init request: " + e.getMessage(),e);
+            final String logMsg = "can't init request: " + e.getMessage();
+            if (e instanceof PwmException && ((PwmException) e).getError() != PwmError.ERROR_UNKNOWN) {
+                LOGGER.error(logMsg);
+            } else {
+                LOGGER.error(logMsg,e);
+            }
             if (!(new PwmURL(req).isResourceURL())) {
                 ErrorInformation errorInformation = new ErrorInformation(PwmError.ERROR_APP_UNAVAILABLE);
                 try {

+ 128 - 75
src/main/java/password/pwm/util/queue/EmailQueueManager.java

@@ -24,24 +24,25 @@ package password.pwm.util.queue;
 
 import password.pwm.AppProperty;
 import password.pwm.PwmApplication;
+import password.pwm.PwmApplicationMode;
 import password.pwm.PwmConstants;
 import password.pwm.bean.EmailItemBean;
 import password.pwm.bean.UserInfoBean;
 import password.pwm.config.Configuration;
 import password.pwm.config.PwmSetting;
+import password.pwm.config.option.DataStorageMethod;
+import password.pwm.error.ErrorInformation;
 import password.pwm.error.PwmError;
 import password.pwm.error.PwmException;
 import password.pwm.error.PwmOperationalException;
-import password.pwm.error.PwmUnrecoverableException;
 import password.pwm.health.HealthMessage;
 import password.pwm.health.HealthRecord;
+import password.pwm.svc.PwmService;
 import password.pwm.svc.stats.Statistic;
 import password.pwm.svc.stats.StatisticsManager;
-import password.pwm.util.JsonUtil;
-import password.pwm.util.PasswordData;
-import password.pwm.util.StringUtil;
-import password.pwm.util.TimeDuration;
+import password.pwm.util.*;
 import password.pwm.util.localdb.LocalDB;
+import password.pwm.util.localdb.LocalDBStoredQueue;
 import password.pwm.util.logging.PwmLogger;
 import password.pwm.util.macro.MacroMachine;
 
@@ -49,52 +50,103 @@ import javax.mail.Message;
 import javax.mail.MessagingException;
 import javax.mail.Transport;
 import javax.mail.internet.*;
-
+import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.util.*;
 
 /**
  * @author Jason D. Rivard
  */
-public class
-        EmailQueueManager extends AbstractQueueManager {
-// ------------------------------ FIELDS ------------------------------
+public class EmailQueueManager implements PwmService {
+
+    private static final PwmLogger LOGGER = PwmLogger.forClass(EmailQueueManager.class);
 
+    private PwmApplication pwmApplication;
     private Properties javaMailProps = new Properties();
+    private WorkQueueProcessor<EmailItemBean> workQueueProcessor;
 
-// --------------------------- CONSTRUCTORS ---------------------------
+    private PwmService.STATUS status = STATUS.NEW;
+    private ErrorInformation lastError;
 
-    public EmailQueueManager() {
+    public void init(final PwmApplication pwmApplication)
+            throws PwmException
+    {
+        status = STATUS.OPENING;
+        this.pwmApplication = pwmApplication;
+        javaMailProps = makeJavaMailProps(pwmApplication.getConfig());
+        final WorkQueueProcessor.Settings settings = new WorkQueueProcessor.Settings();
+        settings.setMaxEvents(Integer.parseInt(pwmApplication.getConfig().readAppProperty(AppProperty.QUEUE_EMAIL_MAX_COUNT)));
+        settings.setRetryDiscardAge(new TimeDuration(Long.parseLong(pwmApplication.getConfig().readAppProperty(AppProperty.QUEUE_EMAIL_MAX_AGE_MS))));
+        settings.setRetryInterval(new TimeDuration(Long.parseLong(pwmApplication.getConfig().readAppProperty(AppProperty.QUEUE_EMAIL_RETRY_TIMEOUT_MS))));
+        final LocalDBStoredQueue localDBStoredQueue = LocalDBStoredQueue.createLocalDBStoredQueue(pwmApplication, pwmApplication.getLocalDB(), LocalDB.DB.EMAIL_QUEUE);
+
+        workQueueProcessor = new WorkQueueProcessor<>(pwmApplication, localDBStoredQueue, settings, new EmailItemProcessor(), this.getClass());
+        status = STATUS.OPEN;
     }
 
-// ------------------------ INTERFACE METHODS ------------------------
+    public void close() {
+        status = STATUS.CLOSED;
+        workQueueProcessor.close();
+    }
 
+    @Override
+    public STATUS status() {
+        return status;
+    }
 
-// --------------------- Interface PwmService ---------------------
+    public List<HealthRecord> healthCheck() {
+        if (pwmApplication.getLocalDB() == null || pwmApplication.getLocalDB().status() != LocalDB.Status.OPEN) {
+            return Collections.singletonList(HealthRecord.forMessage(HealthMessage.ServiceClosed_LocalDBUnavail, this.getClass().getSimpleName()));
+        }
 
-    public void init(final PwmApplication pwmApplication)
-            throws PwmException
-    {
-        LOGGER = PwmLogger.forClass(EmailQueueManager.class);
-        javaMailProps = makeJavaMailProps(pwmApplication.getConfig());
-        final Settings settings = new Settings(
-                new TimeDuration(Long.parseLong(pwmApplication.getConfig().readAppProperty(AppProperty.QUEUE_EMAIL_MAX_AGE_MS))),
-                new TimeDuration(Long.parseLong(pwmApplication.getConfig().readAppProperty(AppProperty.QUEUE_EMAIL_RETRY_TIMEOUT_MS))),
-                Integer.parseInt(pwmApplication.getConfig().readAppProperty(AppProperty.QUEUE_EMAIL_MAX_COUNT)),
-                EmailQueueManager.class.getSimpleName()
-        );
-        super.init(
-                pwmApplication,
-                LocalDB.DB.EMAIL_QUEUE,
-                settings,
-                PwmApplication.AppAttribute.EMAIL_ITEM_COUNTER,
-                EmailQueueManager.class.getSimpleName()
-        );
+        if (pwmApplication.getApplicationMode() == PwmApplicationMode.READ_ONLY) {
+            return Collections.singletonList(HealthRecord.forMessage(HealthMessage.ServiceClosed_AppReadOnly, this.getClass().getSimpleName()));
+        }
+
+        if (lastError != null) {
+            return Collections.singletonList(HealthRecord.forMessage(HealthMessage.Email_SendFailure, lastError.toDebugStr()));
+        }
+
+        return Collections.emptyList();
     }
 
-// -------------------------- OTHER METHODS --------------------------
+    @Override
+    public ServiceInfo serviceInfo() {
+        if (status() == STATUS.OPEN) {
+            return new ServiceInfo(Collections.singletonList(DataStorageMethod.LOCALDB));
+        } else {
+            return new ServiceInfo(Collections.<DataStorageMethod>emptyList());
+        }
+    }
+
+    public int queueSize() {
+        return workQueueProcessor.queueSize();
+    }
 
-    protected boolean determineIfItemCanBeDelivered(final EmailItemBean emailItem) {
+    public Date eldestItem() {
+        return workQueueProcessor.eldestItem();
+    }
+
+    private class EmailItemProcessor implements  WorkQueueProcessor.ItemProcessor<EmailItemBean>  {
+        @Override
+        public WorkQueueProcessor.ProcessResult process(EmailItemBean workItem) {
+                return sendItem(workItem);
+        }
+
+        public String convertToDebugString(EmailItemBean emailItemBean) {
+            return emailItemToDebugString(emailItemBean);
+        }
+    }
+
+    private static String emailItemToDebugString(EmailItemBean emailItemBean) {
+        final Map<String,Object> debugOutputMap = new LinkedHashMap<>();
+        debugOutputMap.put("to", emailItemBean.getTo());
+        debugOutputMap.put("from", emailItemBean.getFrom());
+        debugOutputMap.put("subject", emailItemBean.getSubject());
+        return JsonUtil.serializeMap(debugOutputMap);
+    }
+
+    private boolean determineIfItemCanBeDelivered(final EmailItemBean emailItem) {
         final String serverAddress = pwmApplication.getConfig().readSettingAsString(PwmSetting.EMAIL_SERVER_ADDRESS);
 
         if (serverAddress == null || serverAddress.length() < 1) {
@@ -155,24 +207,23 @@ public class
         }
 
         try {
-            add(workingItemBean);
-        } catch (PwmUnrecoverableException e) {
+            workQueueProcessor.submit(workingItemBean);
+        } catch (PwmOperationalException e) {
             LOGGER.warn("unable to add email to queue: " + e.getMessage());
         }
     }
 
-    void sendItem(final String item) throws PwmOperationalException {
-        final EmailItemBean emailItemBean = JsonUtil.deserialize(item, EmailItemBean.class);
+    private WorkQueueProcessor.ProcessResult sendItem(final EmailItemBean emailItemBean) {
 
         // create a new MimeMessage object (using the Session created above)
         try {
             final List<Message> messages = convertEmailItemToMessages(emailItemBean, this.pwmApplication.getConfig());
-            final String mailuser = this.pwmApplication.getConfig().readSettingAsString(PwmSetting.EMAIL_USERNAME);
-            final PasswordData mailpassword = this.pwmApplication.getConfig().readSettingAsPassword(PwmSetting.EMAIL_PASSWORD);
+            final String mailUser = this.pwmApplication.getConfig().readSettingAsString(PwmSetting.EMAIL_USERNAME);
+            final PasswordData mailPassword = this.pwmApplication.getConfig().readSettingAsPassword(PwmSetting.EMAIL_PASSWORD);
 
             // Login to SMTP server first if both username and password is given
             final String logText;
-            if (mailuser == null || mailuser.length() < 1 || mailpassword == null) {
+            if (mailUser == null || mailUser.length() < 1 || mailPassword == null) {
 
                 logText = "plaintext";
 
@@ -187,7 +238,7 @@ public class
                 final int mailport = (int)this.pwmApplication.getConfig().readSettingAsLong(PwmSetting.EMAIL_SERVER_PORT);
 
                 final Transport tr = session.getTransport("smtp");
-                tr.connect(mailhost, mailport, mailuser, mailpassword.getStringValue());
+                tr.connect(mailhost, mailport, mailUser, mailPassword.getStringValue());
 
                 for (Message message : messages) {
                     message.saveChanges();
@@ -196,32 +247,43 @@ public class
 
                 tr.close();
                 logText = "authenticated ";
+                lastError = null;
             }
 
             LOGGER.debug("successfully sent " + logText + "email: " + emailItemBean.toString());
             StatisticsManager.incrementStat(pwmApplication, Statistic.EMAIL_SEND_SUCCESSES);
-
+            return WorkQueueProcessor.ProcessResult.SUCCESS;
         } catch (Exception e) {
-            LOGGER.error("error during email send attempt: " + e);
+
+            final ErrorInformation errorInformation;
+            if (e instanceof PwmException) {
+                errorInformation = ((PwmException) e).getErrorInformation();
+            } else {
+                final String errorMsg = "error sending email: " + e.getMessage();
+                errorInformation = new ErrorInformation(
+                        PwmError.ERROR_EMAIL_SEND_FAILURE,
+                        errorMsg,
+                        new String[]{ emailItemToDebugString(emailItemBean), Helper.readHostileExceptionMessage(e)}
+                );
+            }
+            LOGGER.error(errorInformation);
 
             if (sendIsRetryable(e)) {
                 LOGGER.error("error sending email (" + e.getMessage() + ") " + emailItemBean.toString() + ", will retry");
                 StatisticsManager.incrementStat(pwmApplication, Statistic.EMAIL_SEND_FAILURES);
-                throw new PwmOperationalException(PwmError.ERROR_UNKNOWN,e.getMessage());
+                return WorkQueueProcessor.ProcessResult.RETRY;
             } else {
                 LOGGER.error(
                         "error sending email (" + e.getMessage() + ") " + emailItemBean.toString() + ", permanent failure, discarding message");
                 StatisticsManager.incrementStat(pwmApplication, Statistic.EMAIL_SEND_DISCARDS);
+                return WorkQueueProcessor.ProcessResult.FAILED;
             }
         }
     }
 
-    @Override
-    List<HealthRecord> failureToHealthRecord(FailureInfo failureInfo) {
-        return Collections.singletonList(HealthRecord.forMessage(HealthMessage.Email_SendFailure, failureInfo.getErrorInformation().toDebugStr()));
-    }
-
-    protected List<Message> convertEmailItemToMessages(final EmailItemBean emailItemBean, final Configuration config) throws MessagingException {
+    public List<Message> convertEmailItemToMessages(final EmailItemBean emailItemBean, final Configuration config)
+            throws MessagingException
+    {
         final List<Message> messages = new ArrayList<>();
         final boolean hasPlainText = emailItemBean.getBodyPlain() != null && emailItemBean.getBodyPlain().length() > 0;
         final boolean hasHtml = emailItemBean.getBodyHtml() != null && emailItemBean.getBodyHtml().length() > 0;
@@ -269,7 +331,7 @@ public class
         return messages;
     }
 
-    protected static Properties makeJavaMailProps(final Configuration config) {
+    private static Properties makeJavaMailProps(final Configuration config) {
         //Create a properties item to start setting up the mail
         final Properties props = new Properties();
 
@@ -288,7 +350,7 @@ public class
         return props;
     }
 
-    protected InternetAddress makeInternetAddress(final String input)
+    private static InternetAddress makeInternetAddress(final String input)
             throws AddressException
     {
         if (input == null) {
@@ -313,28 +375,7 @@ public class
         return new InternetAddress(input);
     }
 
-    @Override
-    protected String queueItemToDebugString(QueueEvent queueEvent)
-    {
-        final Map<String,Object> debugOutputMap = new LinkedHashMap<>();
-        debugOutputMap.put("itemID", queueEvent.getItemID());
-        debugOutputMap.put("timestamp", queueEvent.getTimestamp());
-        final EmailItemBean emailItemBean = JsonUtil.deserialize(queueEvent.getItem(), EmailItemBean.class);
-
-        debugOutputMap.put("to", emailItemBean.getTo());
-        debugOutputMap.put("from", emailItemBean.getFrom());
-        debugOutputMap.put("subject", emailItemBean.getSubject());
-
-        return JsonUtil.serializeMap(debugOutputMap);
-    }
-
-    @Override
-    protected void noteDiscardedItem(QueueEvent queueEvent)
-    {
-        StatisticsManager.incrementStat(pwmApplication, Statistic.EMAIL_SEND_DISCARDS);
-    }
-
-    public static EmailItemBean applyMacrosToEmail(final EmailItemBean emailItem, final MacroMachine macroMachine) {
+    private static EmailItemBean applyMacrosToEmail(final EmailItemBean emailItem, final MacroMachine macroMachine) {
         final EmailItemBean expandedEmailItem;
         expandedEmailItem = new EmailItemBean(
                 macroMachine.expandMacros(emailItem.getTo()),
@@ -346,7 +387,7 @@ public class
         return expandedEmailItem;
     }
 
-    public static EmailItemBean newEmailToAddress(final EmailItemBean emailItem, final String toAddress) {
+    private static EmailItemBean newEmailToAddress(final EmailItemBean emailItem, final String toAddress) {
         final EmailItemBean expandedEmailItem;
         expandedEmailItem = new EmailItemBean(
                 toAddress,
@@ -357,5 +398,17 @@ public class
         );
         return expandedEmailItem;
     }
+
+    private static boolean sendIsRetryable(final Exception e) {
+        if (e != null) {
+            final Throwable cause = e.getCause();
+            if (cause instanceof IOException) {
+                LOGGER.trace("message send failure cause is due to an IOException: " + e.getMessage());
+                return true;
+            }
+        }
+        return false;
+    }
+
 }
 

+ 1 - 0
src/main/resources/password/pwm/i18n/Error.properties

@@ -157,6 +157,7 @@ Error_NoProfileAssigned=No profile is assigned for this operation.
 Error_StartupError=An error occurred while starting the application.  Check the log files for information.
 Error_EnvironmentError=An error with the application environment has prevented the application from starting.
 Error_ApplicationNotRunning=This functionality is not available until the application configuration is restricted.
+Error_EmailSendFailure=Error sending email item %1%, error: %2%
 
 Error_RemoteErrorValue=Remote Error: %1%