소스 검색

improve email connection management

Jason Rivard 5 년 전
부모
커밋
0a05f26fa1

+ 2 - 0
server/src/main/java/password/pwm/AppProperty.java

@@ -303,6 +303,8 @@ public enum AppProperty
     QUEUE_EMAIL_RETRY_TIMEOUT_MS                    ( "queue.email.retryTimeoutMs" ),
     QUEUE_EMAIL_RETRY_TIMEOUT_MS                    ( "queue.email.retryTimeoutMs" ),
     QUEUE_EMAIL_MAX_COUNT                           ( "queue.email.maxCount" ),
     QUEUE_EMAIL_MAX_COUNT                           ( "queue.email.maxCount" ),
     QUEUE_EMAIL_MAX_THREADS                         ( "queue.email.maxThreads" ),
     QUEUE_EMAIL_MAX_THREADS                         ( "queue.email.maxThreads" ),
+    QUEUE_EMAIL_MAX_ITEMS_PER_CONNECTION            ( "queue.email.maxItemsPerConnection" ),
+    QUEUE_EMAIL_MAX_SECONDS_PER_CONNECTION          ( "queue.email.maxSecondsPerConnection" ),
     QUEUE_SMS_RETRY_TIMEOUT_MS                      ( "queue.sms.retryTimeoutMs" ),
     QUEUE_SMS_RETRY_TIMEOUT_MS                      ( "queue.sms.retryTimeoutMs" ),
     QUEUE_SMS_MAX_COUNT                             ( "queue.sms.maxCount" ),
     QUEUE_SMS_MAX_COUNT                             ( "queue.sms.maxCount" ),
     QUEUE_SYSLOG_RETRY_TIMEOUT_MS                   ( "queue.syslog.retryTimeoutMs" ),
     QUEUE_SYSLOG_RETRY_TIMEOUT_MS                   ( "queue.syslog.retryTimeoutMs" ),

+ 1 - 0
server/src/main/java/password/pwm/health/HealthMessage.java

@@ -42,6 +42,7 @@ public enum HealthMessage
     LDAP_TestUserReadPwError( HealthStatus.WARN, HealthTopic.LDAP ),
     LDAP_TestUserReadPwError( HealthStatus.WARN, HealthTopic.LDAP ),
     LDAP_TestUserOK( HealthStatus.GOOD, HealthTopic.LDAP ),
     LDAP_TestUserOK( HealthStatus.GOOD, HealthTopic.LDAP ),
     Email_SendFailure( HealthStatus.WARN, HealthTopic.Email ),
     Email_SendFailure( HealthStatus.WARN, HealthTopic.Email ),
+    Email_ConnectFailure( HealthStatus.WARN, HealthTopic.Email ),
     PwNotify_Failure( HealthStatus.WARN, HealthTopic.Application ),
     PwNotify_Failure( HealthStatus.WARN, HealthTopic.Application ),
     MissingResource( HealthStatus.DEBUG, HealthTopic.Integrity ),
     MissingResource( HealthStatus.DEBUG, HealthTopic.Integrity ),
     BrokenMethod( HealthStatus.DEBUG, HealthTopic.Integrity ),
     BrokenMethod( HealthStatus.DEBUG, HealthTopic.Integrity ),

+ 63 - 2
server/src/main/java/password/pwm/svc/email/EmailConnection.java

@@ -20,13 +20,74 @@
 
 
 package password.pwm.svc.email;
 package password.pwm.svc.email;
 
 
-import lombok.Value;
+import password.pwm.util.java.AtomicLoopIntIncrementer;
+import password.pwm.util.logging.PwmLogger;
 
 
+import javax.mail.MessagingException;
 import javax.mail.Transport;
 import javax.mail.Transport;
+import java.time.Instant;
 
 
-@Value
 class EmailConnection
 class EmailConnection
 {
 {
+    private static final PwmLogger LOGGER = PwmLogger.forClass( EmailConnection.class );
+
     private final EmailServer emailServer;
     private final EmailServer emailServer;
     private final Transport transport;
     private final Transport transport;
+    private final AtomicLoopIntIncrementer sentItems = new AtomicLoopIntIncrementer( );
+    private final Instant startTime = Instant.now();
+    private final String id;
+
+    private static final AtomicLoopIntIncrementer ID_COUNTER = new AtomicLoopIntIncrementer();
+
+    EmailConnection( final EmailServer emailServer, final Transport transport )
+    {
+        this.emailServer = emailServer;
+        this.transport = transport;
+        this.id = String.valueOf( ID_COUNTER.next() );
+    }
+
+    public EmailServer getEmailServer()
+    {
+        return emailServer;
+    }
+
+    public int getSentItems()
+    {
+        return sentItems.get();
+    }
+
+    public Transport getTransport()
+    {
+        return transport;
+    }
+
+    public void incrementSentItems()
+    {
+        sentItems.next();
+    }
+
+    public Instant getStartTime()
+    {
+        return startTime;
+    }
+
+    public String getId()
+    {
+        return id;
+    }
+
+    public void close()
+    {
+        if ( getTransport() != null )
+        {
+            try
+            {
+                 getTransport().close();
+            }
+            catch ( final MessagingException e )
+            {
+               LOGGER.debug( () -> "error closing connection: " + e.getMessage() );
+            }
+        }
+    }
 }
 }

+ 219 - 0
server/src/main/java/password/pwm/svc/email/EmailConnectionPool.java

@@ -0,0 +1,219 @@
+/*
+ * Password Management Servlets (PWM)
+ * http://www.pwm-project.org
+ *
+ * Copyright (c) 2006-2009 Novell, Inc.
+ * Copyright (c) 2009-2020 The PWM Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package password.pwm.svc.email;
+
+import password.pwm.error.ErrorInformation;
+import password.pwm.error.PwmError;
+import password.pwm.error.PwmUnrecoverableException;
+import password.pwm.util.java.AtomicLoopIntIncrementer;
+import password.pwm.util.java.JavaHelper;
+import password.pwm.util.java.TimeDuration;
+import password.pwm.util.logging.PwmLogger;
+
+import javax.mail.Transport;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class EmailConnectionPool
+{
+    private static final PwmLogger LOGGER = PwmLogger.forClass( EmailConnectionPool.class );
+
+    private final Set<EmailConnection> connections = Collections.newSetFromMap( new ConcurrentHashMap<>() );
+    private final Lock lock = new ReentrantLock();
+
+    private final EmailServiceSettings settings;
+    private final List<EmailServer> servers;
+
+    private final AtomicInteger activeConnectionCounter = new AtomicInteger();
+    private final AtomicBoolean closed = new AtomicBoolean( false );
+    private final AtomicLoopIntIncrementer serverIncrementer;
+
+    public EmailConnectionPool(
+            final List<EmailServer> servers,
+            final EmailServiceSettings settings )
+    {
+        this.servers = Collections.unmodifiableList( new ArrayList<>( servers ) );
+        this.serverIncrementer = AtomicLoopIntIncrementer.builder().ceiling( servers.size() ).build();
+        this.settings = settings;
+    }
+
+    public int idleConnectionCount()
+    {
+        return connections.size();
+    }
+
+    public int activeConnectionCount()
+    {
+        return activeConnectionCounter.get();
+    }
+
+    public List<EmailServer> getServers()
+    {
+        return servers;
+    }
+
+    public EmailConnection getConnection()
+            throws PwmUnrecoverableException
+    {
+        if ( closed.get() )
+        {
+            throw new PwmUnrecoverableException( PwmError.ERROR_SERVICE_NOT_AVAILABLE, "email connection pool is closed" );
+        }
+
+        lock.lock();
+        try
+        {
+            while  ( !connections.isEmpty() )
+            {
+                final EmailConnection emailConnection = connections.iterator().next();
+                connections.remove( emailConnection );
+                if ( connectionStillValid( emailConnection ) )
+                {
+                    activeConnectionCounter.incrementAndGet();
+                    return emailConnection;
+                }
+                emailConnection.close();
+            }
+            final Instant startTime = Instant.now();
+            final EmailConnection emailConnection = getSmtpTransport();
+            LOGGER.trace( () -> "created new email connection " + emailConnection.getId()
+                            + " to " + emailConnection.getEmailServer().getId(),
+                    () -> TimeDuration.fromCurrent( startTime ) );
+            activeConnectionCounter.incrementAndGet();
+            return emailConnection;
+        }
+        finally
+        {
+            lock.unlock();
+        }
+    }
+
+    public void returnEmailConnection( final EmailConnection emailConnection )
+    {
+        lock.lock();
+        try
+        {
+
+            if ( connections.add( emailConnection ) )
+            {
+                activeConnectionCounter.decrementAndGet();
+            }
+            else
+            {
+                LOGGER.warn( () -> "connection " + emailConnection.getId() + "returned but was already in oool" );
+            }
+        }
+        finally
+        {
+            lock.unlock();
+        }
+    }
+
+    private boolean connectionStillValid( final EmailConnection emailConnection )
+    {
+        if ( emailConnection.getSentItems() >= settings.getConnectionSendItemLimit() )
+        {
+            LOGGER.trace( () -> "email connection " + emailConnection.getId() + " has sent " + emailConnection.getSentItems() + " and will be retired" );
+            return false;
+        }
+
+        final TimeDuration connectionAge = TimeDuration.fromCurrent( emailConnection.getStartTime() );
+        if ( connectionAge.isLongerThan( settings.getConnectionSendItemDuration() ) )
+        {
+            LOGGER.trace( () -> "email connection " + emailConnection.getId() + " has lived " + connectionAge.asCompactString() + " and will be retired" );
+            return false;
+        }
+
+        if ( !emailConnection.getTransport().isConnected() )
+        {
+            LOGGER.trace( () -> "email connection " + emailConnection.getId() + " is no longer connected " + connectionAge.asCompactString() + " and will be retired" );
+            return false;
+
+        }
+
+        return true;
+    }
+
+    private EmailConnection getSmtpTransport( )
+            throws PwmUnrecoverableException
+    {
+        final int serverCount = servers.size();
+
+        // the global server incrementer rotates the server list by 1 offset each attempt to get an smtp transport.
+        int nextSlot = serverIncrementer.next();
+
+        for ( int i = 0; i < serverCount; i++ )
+        {
+            if ( nextSlot >= serverCount )
+            {
+                nextSlot -= serverCount;
+            }
+
+            final EmailServer server = servers.get( nextSlot );
+            try
+            {
+                final Transport transport = EmailServerUtil.makeSmtpTransport( server );
+                server.getConnectionStats().increment( EmailServer.ServerStat.newConnections );
+                return new EmailConnection( server, transport );
+            }
+            catch ( final Exception e )
+            {
+                final String exceptionMsg = JavaHelper.readHostileExceptionMessage( e );
+                final String msg = "unable to connect to email server '" + server.toDebugString() + "', error: " + exceptionMsg;
+                final ErrorInformation errorInformation = new ErrorInformation( PwmError.ERROR_SERVICE_UNREACHABLE, msg );
+                server.getConnectionStats().increment( EmailServer.ServerStat.failedConnections );
+                LOGGER.warn( errorInformation::toDebugStr );
+            }
+
+            nextSlot++;
+        }
+
+        throw PwmUnrecoverableException.newException( PwmError.ERROR_SERVICE_UNREACHABLE, "unable to reach any configured email server" );
+    }
+
+
+    public void close()
+    {
+        closed.set( true );
+        lock.lock();
+        try
+        {
+            for ( final EmailConnection emailConnection : connections )
+            {
+                emailConnection.close();
+            }
+            connections.clear();
+        }
+        finally
+        {
+            lock.unlock();
+        }
+
+    }
+}

+ 18 - 0
server/src/main/java/password/pwm/svc/email/EmailServer.java

@@ -23,12 +23,17 @@ package password.pwm.svc.email;
 import lombok.Builder;
 import lombok.Builder;
 import lombok.Value;
 import lombok.Value;
 import password.pwm.config.option.SmtpServerType;
 import password.pwm.config.option.SmtpServerType;
+import password.pwm.error.ErrorInformation;
 import password.pwm.util.PasswordData;
 import password.pwm.util.PasswordData;
+import password.pwm.util.java.MovingAverage;
+import password.pwm.util.java.StatisticIntCounterMap;
 import password.pwm.util.java.StringUtil;
 import password.pwm.util.java.StringUtil;
+import password.pwm.util.java.TimeDuration;
 
 
 import java.util.LinkedHashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
 
 
 @Value
 @Value
 @Builder
 @Builder
@@ -43,6 +48,19 @@ public class EmailServer
     private javax.mail.Session session;
     private javax.mail.Session session;
     private SmtpServerType type;
     private SmtpServerType type;
 
 
+    private final StatisticIntCounterMap<ServerStat> connectionStats = new StatisticIntCounterMap<>( ServerStat.class );
+    private final MovingAverage averageSendTime = new MovingAverage( TimeDuration.MINUTE );
+    private final AtomicReference<ErrorInformation> lastConnectError = new AtomicReference();
+
+
+    enum ServerStat
+    {
+        sendCount,
+        sendFailures,
+        newConnections,
+        failedConnections,
+    }
+
     public String toDebugString()
     public String toDebugString()
     {
     {
         final Map<String, String> debugProps = new LinkedHashMap<>(  );
         final Map<String, String> debugProps = new LinkedHashMap<>(  );

+ 27 - 1
server/src/main/java/password/pwm/svc/email/EmailServerUtil.java

@@ -32,10 +32,13 @@ import password.pwm.config.option.SmtpServerType;
 import password.pwm.config.profile.EmailServerProfile;
 import password.pwm.config.profile.EmailServerProfile;
 import password.pwm.error.PwmError;
 import password.pwm.error.PwmError;
 import password.pwm.error.PwmUnrecoverableException;
 import password.pwm.error.PwmUnrecoverableException;
+import password.pwm.health.HealthMessage;
+import password.pwm.health.HealthRecord;
 import password.pwm.http.HttpContentType;
 import password.pwm.http.HttpContentType;
 import password.pwm.util.PasswordData;
 import password.pwm.util.PasswordData;
 import password.pwm.util.java.JavaHelper;
 import password.pwm.util.java.JavaHelper;
 import password.pwm.util.java.StringUtil;
 import password.pwm.util.java.StringUtil;
+import password.pwm.util.java.TimeDuration;
 import password.pwm.util.logging.PwmLogger;
 import password.pwm.util.logging.PwmLogger;
 import password.pwm.util.macro.MacroMachine;
 import password.pwm.util.macro.MacroMachine;
 import password.pwm.util.secure.PwmTrustManager;
 import password.pwm.util.secure.PwmTrustManager;
@@ -54,6 +57,7 @@ import javax.net.ssl.TrustManager;
 import java.io.IOException;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.io.UnsupportedEncodingException;
 import java.security.cert.X509Certificate;
 import java.security.cert.X509Certificate;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Collections;
@@ -365,6 +369,7 @@ public class EmailServerUtil
     static Transport makeSmtpTransport( final EmailServer server )
     static Transport makeSmtpTransport( final EmailServer server )
             throws MessagingException, PwmUnrecoverableException
             throws MessagingException, PwmUnrecoverableException
     {
     {
+        final Instant startTime = Instant.now();
         // Login to SMTP server first if both username and password is given
         // Login to SMTP server first if both username and password is given
         final boolean authenticated = !StringUtil.isEmpty( server.getUsername() ) && server.getPassword() != null;
         final boolean authenticated = !StringUtil.isEmpty( server.getUsername() ) && server.getPassword() != null;
 
 
@@ -385,7 +390,8 @@ public class EmailServerUtil
             transport.connect();
             transport.connect();
         }
         }
 
 
-        LOGGER.debug( () -> "connected to " + server.toDebugString() + " " + ( authenticated ? "(authenticated)" : "(unauthenticated)" ) );
+        LOGGER.debug( () -> "connected to " + server.toDebugString() + " " + ( authenticated ? "(authenticated)" : "(unauthenticated)" ),
+                () -> TimeDuration.fromCurrent( startTime ) );
 
 
         return transport;
         return transport;
     }
     }
@@ -420,4 +426,24 @@ public class EmailServerUtil
 
 
         return Collections.emptyList();
         return Collections.emptyList();
     }
     }
+
+    static List<HealthRecord> checkAllConfiguredServers( final List<EmailServer> emailServers )
+    {
+        final List<HealthRecord> records = new ArrayList<>();
+        for ( final EmailServer emailServer : emailServers )
+        {
+            try
+            {
+                final Transport transport = EmailServerUtil.makeSmtpTransport( emailServer );
+                transport.isConnected();
+                transport.close();
+            }
+            catch ( final Exception e )
+            {
+                records.add( HealthRecord.forMessage( HealthMessage.Email_ConnectFailure, emailServer.getId(), e.getMessage() ) );
+            }
+        }
+
+        return Collections.unmodifiableList( records );
+    }
 }
 }

+ 115 - 133
server/src/main/java/password/pwm/svc/email/EmailService.java

@@ -20,12 +20,10 @@
 
 
 package password.pwm.svc.email;
 package password.pwm.svc.email;
 
 
-import password.pwm.AppProperty;
 import password.pwm.PwmApplication;
 import password.pwm.PwmApplication;
 import password.pwm.PwmApplicationMode;
 import password.pwm.PwmApplicationMode;
 import password.pwm.bean.EmailItemBean;
 import password.pwm.bean.EmailItemBean;
 import password.pwm.config.Configuration;
 import password.pwm.config.Configuration;
-import password.pwm.config.PwmSetting;
 import password.pwm.config.option.DataStorageMethod;
 import password.pwm.config.option.DataStorageMethod;
 import password.pwm.error.ErrorInformation;
 import password.pwm.error.ErrorInformation;
 import password.pwm.error.PwmError;
 import password.pwm.error.PwmError;
@@ -38,8 +36,10 @@ import password.pwm.ldap.UserInfo;
 import password.pwm.svc.PwmService;
 import password.pwm.svc.PwmService;
 import password.pwm.svc.stats.Statistic;
 import password.pwm.svc.stats.Statistic;
 import password.pwm.svc.stats.StatisticsManager;
 import password.pwm.svc.stats.StatisticsManager;
-import password.pwm.util.java.AtomicLoopIntIncrementer;
+import password.pwm.util.java.ConditionalTaskExecutor;
 import password.pwm.util.java.JavaHelper;
 import password.pwm.util.java.JavaHelper;
+import password.pwm.util.java.JsonUtil;
+import password.pwm.util.java.StatisticIntCounterMap;
 import password.pwm.util.java.StringUtil;
 import password.pwm.util.java.StringUtil;
 import password.pwm.util.java.TimeDuration;
 import password.pwm.util.java.TimeDuration;
 import password.pwm.util.localdb.LocalDB;
 import password.pwm.util.localdb.LocalDB;
@@ -54,14 +54,10 @@ import javax.mail.Transport;
 import java.time.Instant;
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicReference;
 
 
 /**
 /**
  * @author Jason D. Rivard
  * @author Jason D. Rivard
@@ -71,23 +67,26 @@ public class EmailService implements PwmService
     private static final PwmLogger LOGGER = PwmLogger.forClass( EmailService.class );
     private static final PwmLogger LOGGER = PwmLogger.forClass( EmailService.class );
 
 
     private PwmApplication pwmApplication;
     private PwmApplication pwmApplication;
-    private final Map<EmailServer, ErrorInformation> serverErrors = new ConcurrentHashMap<>( );
     private ErrorInformation startupError;
     private ErrorInformation startupError;
-    private final List<EmailServer> servers = new ArrayList<>( );
     private WorkQueueProcessor<EmailItemBean> workQueueProcessor;
     private WorkQueueProcessor<EmailItemBean> workQueueProcessor;
-    private AtomicLoopIntIncrementer serverIncrementer;
-    private Set<Integer> retryableStatusResponses = Collections.emptySet();
+    private EmailServiceSettings emailServiceSettings;
+    private EmailConnectionPool connectionPool;
 
 
-    private PwmService.STATUS status = STATUS.NEW;
+    private final AtomicReference<ErrorInformation> lastSendError = new AtomicReference<>();
+
+    private final ConditionalTaskExecutor statsLogger = ConditionalTaskExecutor.forPeriodicTask( this::logStats, TimeDuration.MINUTE );
 
 
-    private final ThreadLocal<EmailConnection> threadLocalTransport = new ThreadLocal<>();
+    private PwmService.STATUS status = STATUS.NEW;
 
 
     public void init( final PwmApplication pwmApplication )
     public void init( final PwmApplication pwmApplication )
             throws PwmException
             throws PwmException
     {
     {
         status = STATUS.OPENING;
         status = STATUS.OPENING;
         this.pwmApplication = pwmApplication;
         this.pwmApplication = pwmApplication;
+        this.emailServiceSettings = EmailServiceSettings.fromConfiguration( pwmApplication.getConfig() );
+        LOGGER.trace( () -> "initializing with settings: " + JsonUtil.serialize( emailServiceSettings ) );
 
 
+        final List<EmailServer> servers = new ArrayList<>();
         try
         try
         {
         {
             servers.addAll( EmailServerUtil.makeEmailServersMap( pwmApplication.getConfig() ) );
             servers.addAll( EmailServerUtil.makeEmailServersMap( pwmApplication.getConfig() ) );
@@ -107,8 +106,6 @@ public class EmailService implements PwmService
             return;
             return;
         }
         }
 
 
-        serverIncrementer = AtomicLoopIntIncrementer.builder().ceiling( servers.size() - 1 ).build();
-
         if ( pwmApplication.getLocalDB() == null || pwmApplication.getLocalDB().status() != LocalDB.Status.OPEN )
         if ( pwmApplication.getLocalDB() == null || pwmApplication.getLocalDB().status() != LocalDB.Status.OPEN )
         {
         {
             LOGGER.warn( () -> "localdb is not open, EmailService will remain closed" );
             LOGGER.warn( () -> "localdb is not open, EmailService will remain closed" );
@@ -116,22 +113,23 @@ public class EmailService implements PwmService
             return;
             return;
         }
         }
 
 
+        LOGGER.debug( () -> "starting with settings: " + JsonUtil.serialize( emailServiceSettings ) );
+
         final WorkQueueProcessor.Settings settings = WorkQueueProcessor.Settings.builder()
         final WorkQueueProcessor.Settings settings = WorkQueueProcessor.Settings.builder()
-                .maxEvents( Integer.parseInt( pwmApplication.getConfig().readAppProperty( AppProperty.QUEUE_EMAIL_MAX_COUNT ) ) )
-                .retryDiscardAge( TimeDuration.of( pwmApplication.getConfig().readSettingAsLong( PwmSetting.EMAIL_MAX_QUEUE_AGE ), TimeDuration.Unit.SECONDS ) )
-                .retryInterval( TimeDuration.of(
-                        Long.parseLong( pwmApplication.getConfig().readAppProperty( AppProperty.QUEUE_EMAIL_RETRY_TIMEOUT_MS ) ),
-                        TimeDuration.Unit.MILLISECONDS )
-                )
-                .preThreads( Integer.parseInt( pwmApplication.getConfig().readAppProperty( AppProperty.QUEUE_EMAIL_MAX_THREADS ) ) )
+                .maxEvents( emailServiceSettings.getQueueMaxItems() )
+                .retryDiscardAge( emailServiceSettings.getQueueDiscardAge() )
+                .retryInterval( emailServiceSettings.getQueueRetryTimeout() )
+                .preThreads( emailServiceSettings.getMaxThreads() )
                 .build();
                 .build();
         final LocalDBStoredQueue localDBStoredQueue = LocalDBStoredQueue.createLocalDBStoredQueue( pwmApplication, pwmApplication.getLocalDB(), LocalDB.DB.EMAIL_QUEUE );
         final LocalDBStoredQueue localDBStoredQueue = LocalDBStoredQueue.createLocalDBStoredQueue( pwmApplication, pwmApplication.getLocalDB(), LocalDB.DB.EMAIL_QUEUE );
 
 
         workQueueProcessor = new WorkQueueProcessor<>( pwmApplication, localDBStoredQueue, settings, new EmailItemProcessor(), this.getClass() );
         workQueueProcessor = new WorkQueueProcessor<>( pwmApplication, localDBStoredQueue, settings, new EmailItemProcessor(), this.getClass() );
 
 
-        retryableStatusResponses = readRetryableStatusCodes( pwmApplication.getConfig() );
+        connectionPool = new EmailConnectionPool( servers, emailServiceSettings );
 
 
         status = STATUS.OPEN;
         status = STATUS.OPEN;
+
+        statsLogger.conditionallyExecuteTask();
     }
     }
 
 
     public void close( )
     public void close( )
@@ -140,6 +138,12 @@ public class EmailService implements PwmService
         if ( workQueueProcessor != null )
         if ( workQueueProcessor != null )
         {
         {
             workQueueProcessor.close();
             workQueueProcessor.close();
+            workQueueProcessor = null;
+        }
+        if ( connectionPool != null )
+        {
+            connectionPool.close();
+            connectionPool = null;
         }
         }
     }
     }
 
 
@@ -167,24 +171,25 @@ public class EmailService implements PwmService
         }
         }
 
 
         final List<HealthRecord> records = new ArrayList<>( );
         final List<HealthRecord> records = new ArrayList<>( );
-        final Map<EmailServer, ErrorInformation> localMap = new HashMap<>( serverErrors );
-        for ( final Map.Entry<EmailServer, ErrorInformation> entry : localMap.entrySet() )
         {
         {
-            final ErrorInformation errorInformation = entry.getValue();
-            records.add( HealthRecord.forMessage( HealthMessage.Email_SendFailure, errorInformation.toDebugStr() ) );
+            final ErrorInformation lastError = lastSendError.get();
+            if ( lastError != null )
+            {
+                records.add( HealthRecord.forMessage( HealthMessage.Email_SendFailure, lastError.toDebugStr() ) );
+
+            }
         }
         }
 
 
+        records.addAll( EmailServerUtil.checkAllConfiguredServers( connectionPool.getServers() ) );
+
         return Collections.unmodifiableList( records );
         return Collections.unmodifiableList( records );
     }
     }
 
 
     @Override
     @Override
     public ServiceInfoBean serviceInfo( )
     public ServiceInfoBean serviceInfo( )
     {
     {
-        final Map<String, String> debugItems = new LinkedHashMap<>();
-        if ( workQueueProcessor != null )
-        {
-            debugItems.putAll( workQueueProcessor.debugInfo() );
-        }
+        final Map<String, String> debugItems = stats();
+
         if ( status() == STATUS.OPEN )
         if ( status() == STATUS.OPEN )
         {
         {
             return new ServiceInfoBean( Collections.singletonList( DataStorageMethod.LOCALDB ), debugItems );
             return new ServiceInfoBean( Collections.singletonList( DataStorageMethod.LOCALDB ), debugItems );
@@ -223,11 +228,50 @@ public class EmailService implements PwmService
         }
         }
     }
     }
 
 
+    private void logStats()
+    {
+        LOGGER.trace( () -> "stats: " + StringUtil.mapToString( stats() ) );
+    }
+
+
+    private Map<String, String> stats()
+    {
+        final Map<String, String> stats = new TreeMap<>( );
+        if ( workQueueProcessor != null )
+        {
+            stats.putAll( workQueueProcessor.debugInfo() );
+            stats.put( "workQueueSize", String.valueOf( workQueueProcessor.queueSize() ) );
+        }
+        if ( connectionPool != null )
+        {
+            stats.put( "idleConnections", Integer.toString( connectionPool.idleConnectionCount() ) );
+            stats.put( "activeConnections", Integer.toString( connectionPool.activeConnectionCount() ) );
+
+            for ( final EmailServer emailServer : connectionPool.getServers() )
+            {
+                final StatisticIntCounterMap<EmailServer.ServerStat> serverStats = emailServer.getConnectionStats();
+                for ( final EmailServer.ServerStat serverStat : EmailServer.ServerStat.values() )
+                {
+                    final String name = serverStat.name() + "[" + emailServer.getId() + "]";
+                    stats.put( name, String.valueOf( serverStats.get( serverStat ) ) );
+                }
+                {
+                    final String name = "averageSendTime[" + emailServer.getId() + "]";
+                    final TimeDuration value =  TimeDuration.of( ( long ) emailServer.getAverageSendTime().getAverage(), TimeDuration.Unit.MILLISECONDS );
+                    stats.put( name, value.asCompactString() );
+
+                }
+            }
+        }
+
+        return Collections.unmodifiableMap( stats );
+    }
+
     private boolean determineIfItemCanBeDelivered( final EmailItemBean emailItem )
     private boolean determineIfItemCanBeDelivered( final EmailItemBean emailItem )
     {
     {
-        if ( servers.isEmpty() )
+        if ( status() != STATUS.OPEN )
         {
         {
-            LOGGER.debug( () -> "discarding email send event, no email servers configured" );
+            LOGGER.debug( () -> "discarding email send event, no service is not running" );
             return false;
             return false;
         }
         }
 
 
@@ -306,6 +350,8 @@ public class EmailService implements PwmService
             return;
             return;
         }
         }
 
 
+        checkIfServiceIsOpen();
+
         final EmailItemBean finalBean;
         final EmailItemBean finalBean;
         {
         {
             EmailItemBean workingItemBean = emailItem;
             EmailItemBean workingItemBean = emailItem;
@@ -347,6 +393,17 @@ public class EmailService implements PwmService
         {
         {
             LOGGER.warn( () -> "unable to add email to queue: " + e.getMessage() );
             LOGGER.warn( () -> "unable to add email to queue: " + e.getMessage() );
         }
         }
+
+        statsLogger.conditionallyExecuteTask();
+    }
+
+    private void checkIfServiceIsOpen()
+            throws PwmUnrecoverableException
+    {
+        if ( !STATUS.OPEN.equals( status ) )
+        {
+            throw new PwmUnrecoverableException( PwmError.ERROR_SERVICE_NOT_AVAILABLE, "email service is closed and will not accent new jobs" );
+        }
     }
     }
 
 
     public static void sendEmailSynchronous(
     public static void sendEmailSynchronous(
@@ -379,21 +436,6 @@ public class EmailService implements PwmService
         transport.close();
         transport.close();
     }
     }
 
 
-    private final AtomicInteger newThreadLocalTransport = new AtomicInteger();
-    private final AtomicInteger useExistingConnection = new AtomicInteger();
-    private final AtomicInteger useExistingTransport = new AtomicInteger();
-    private final AtomicInteger newConnectionCounter = new AtomicInteger();
-
-    private String stats( )
-    {
-        final Map<String, Integer> map = new HashMap<>();
-        map.put( "newThreadLocalTransport", newThreadLocalTransport.get() );
-        map.put( "useExistingConnection", newThreadLocalTransport.get() );
-        map.put( "useExistingTransport", useExistingTransport.get() );
-        map.put( "newConnectionCounter", newConnectionCounter.get() );
-        return StringUtil.mapToString( map );
-    }
-
     private WorkQueueProcessor.ProcessResult sendItem( final EmailItemBean emailItemBean )
     private WorkQueueProcessor.ProcessResult sendItem( final EmailItemBean emailItemBean )
     {
     {
         try
         try
@@ -403,7 +445,7 @@ public class EmailService implements PwmService
         }
         }
         catch ( final MessagingException | PwmException e )
         catch ( final MessagingException | PwmException e )
         {
         {
-            if ( EmailServerUtil.examineSendFailure( e, retryableStatusResponses ) )
+            if ( EmailServerUtil.examineSendFailure( e, emailServiceSettings.getRetryableStatusResponses() ) )
             {
             {
                 LOGGER.error( () -> "error sending email (" + e.getMessage() + ") " + emailItemBean.toDebugString() + ", will retry" );
                 LOGGER.error( () -> "error sending email (" + e.getMessage() + ") " + emailItemBean.toDebugString() + ", will retry" );
                 StatisticsManager.incrementStat( pwmApplication, Statistic.EMAIL_SEND_FAILURES );
                 StatisticsManager.incrementStat( pwmApplication, Statistic.EMAIL_SEND_FAILURES );
@@ -418,59 +460,37 @@ public class EmailService implements PwmService
         }
         }
     }
     }
 
 
+
     private void executeEmailSend( final EmailItemBean emailItemBean )
     private void executeEmailSend( final EmailItemBean emailItemBean )
             throws PwmUnrecoverableException, MessagingException
             throws PwmUnrecoverableException, MessagingException
     {
     {
-        EmailConnection serverTransport = null;
+        final Instant startTime = Instant.now();
+        EmailConnection emailConnection = null;
 
 
         try
         try
         {
         {
-            // create a new MimeMessage object (using the Session created above)
-            if ( threadLocalTransport.get() == null )
-            {
-
-                LOGGER.trace( () -> "initializing new threadLocal transport, stats: " + stats() );
-                threadLocalTransport.set( getSmtpTransport( ) );
-                newThreadLocalTransport.getAndIncrement();
-            }
-            else
-            {
-                LOGGER.trace( () -> "using existing threadLocal transport, stats: " + stats() );
-                useExistingTransport.getAndIncrement();
-            }
-
-            serverTransport = threadLocalTransport.get();
-
-            if ( !serverTransport.getTransport().isConnected() )
-            {
-                LOGGER.trace( () -> "connecting threadLocal transport, stats: " + stats() );
-                threadLocalTransport.set( getSmtpTransport( ) );
-                serverTransport = threadLocalTransport.get();
-                newConnectionCounter.getAndIncrement();
-            }
-            else
-            {
-                LOGGER.trace( () -> "using existing threadLocal: stats: " + stats() );
-                useExistingConnection.getAndIncrement();
-            }
+            emailConnection = connectionPool.getConnection();
 
 
             final List<Message> messages = EmailServerUtil.convertEmailItemToMessages(
             final List<Message> messages = EmailServerUtil.convertEmailItemToMessages(
                     emailItemBean,
                     emailItemBean,
                     this.pwmApplication.getConfig(),
                     this.pwmApplication.getConfig(),
-                    serverTransport.getEmailServer()
+                    emailConnection.getEmailServer()
             );
             );
 
 
             for ( final Message message : messages )
             for ( final Message message : messages )
             {
             {
                 message.saveChanges();
                 message.saveChanges();
-                serverTransport.getTransport().sendMessage( message, message.getAllRecipients() );
+                emailConnection.getTransport().sendMessage( message, message.getAllRecipients() );
             }
             }
 
 
-            serverErrors.remove( serverTransport.getEmailServer() );
+            emailConnection.incrementSentItems();
+            emailConnection.getEmailServer().getConnectionStats().increment( EmailServer.ServerStat.sendCount );
+            final TimeDuration sendTime = TimeDuration.fromCurrent( startTime );
+            emailConnection.getEmailServer().getAverageSendTime().update( sendTime.asMillis() );
+            lastSendError.set( null );
 
 
-            LOGGER.debug( () -> "sent email: " + emailItemBean.toDebugString() );
+            LOGGER.debug( () -> "sent email: " + emailItemBean.toDebugString(), () -> sendTime );
             StatisticsManager.incrementStat( pwmApplication, Statistic.EMAIL_SEND_SUCCESSES );
             StatisticsManager.incrementStat( pwmApplication, Statistic.EMAIL_SEND_SUCCESSES );
-
         }
         }
         catch ( final MessagingException | PwmException e )
         catch ( final MessagingException | PwmException e )
         {
         {
@@ -488,68 +508,30 @@ public class EmailService implements PwmService
                         new String[] {
                         new String[] {
                                 emailItemBean.toDebugString(),
                                 emailItemBean.toDebugString(),
                                 JavaHelper.readHostileExceptionMessage( e ),
                                 JavaHelper.readHostileExceptionMessage( e ),
-                        }
+                                }
                 );
                 );
             }
             }
 
 
-            if ( serverTransport != null )
+            if ( emailConnection != null )
             {
             {
-                serverErrors.put( serverTransport.getEmailServer(), errorInformation );
+                lastSendError.set( errorInformation );
+                emailConnection.getEmailServer().getConnectionStats().increment( EmailServer.ServerStat.sendFailures );
+
             }
             }
             LOGGER.error( errorInformation );
             LOGGER.error( errorInformation );
             throw e;
             throw e;
         }
         }
-    }
-
-    private EmailConnection getSmtpTransport( )
-            throws PwmUnrecoverableException
-    {
-
-        // the global server incrementer rotates the server list by 1 offset each attempt to get an smtp transport.
-        int nextSlot = serverIncrementer.next();
-
-        for ( int i = 0; i < servers.size(); i++ )
+        finally
         {
         {
-            nextSlot = nextSlot >= ( servers.size() - 1 )
-                    ? 0
-                    : nextSlot + 1;
-
-            final EmailServer server = servers.get( nextSlot );
-            try
+            if ( emailConnection != null )
             {
             {
-                final Transport transport = EmailServerUtil.makeSmtpTransport( server );
-
-                serverErrors.remove( server );
-                return new EmailConnection( server, transport );
-            }
-            catch ( final Exception e )
-            {
-                final String exceptionMsg = JavaHelper.readHostileExceptionMessage( e );
-                final String msg = "unable to connect to email server '" + server.toDebugString() + "', error: " + exceptionMsg;
-                final ErrorInformation errorInformation = new ErrorInformation( PwmError.ERROR_SERVICE_UNREACHABLE, msg );
-                serverErrors.put( server, errorInformation );
-                LOGGER.warn( () -> errorInformation.toDebugStr() );
+                connectionPool.returnEmailConnection( emailConnection );
             }
             }
         }
         }
 
 
-        throw PwmUnrecoverableException.newException( PwmError.ERROR_SERVICE_UNREACHABLE, "unable to reach any configured email server" );
+        statsLogger.conditionallyExecuteTask();
     }
     }
 
 
-    private static Set<Integer> readRetryableStatusCodes( final Configuration configuration )
-    {
-        final String rawAppProp = configuration.readAppProperty( AppProperty.SMTP_RETRYABLE_SEND_RESPONSE_STATUSES );
-        if ( StringUtil.isEmpty( rawAppProp ) )
-        {
-            return Collections.emptySet();
-        }
 
 
-        final Set<Integer> returnData = new HashSet<>();
-        for ( final String loopString : rawAppProp.split( "," ) )
-        {
-            final Integer loopInt = Integer.parseInt( loopString );
-            returnData.add( loopInt );
-        }
-        return Collections.unmodifiableSet( returnData );
-    }
 }
 }
 
 

+ 87 - 0
server/src/main/java/password/pwm/svc/email/EmailServiceSettings.java

@@ -0,0 +1,87 @@
+/*
+ * Password Management Servlets (PWM)
+ * http://www.pwm-project.org
+ *
+ * Copyright (c) 2006-2009 Novell, Inc.
+ * Copyright (c) 2009-2020 The PWM Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package password.pwm.svc.email;
+
+import lombok.Builder;
+import lombok.Value;
+import password.pwm.AppProperty;
+import password.pwm.config.Configuration;
+import password.pwm.config.PwmSetting;
+import password.pwm.util.java.StringUtil;
+import password.pwm.util.java.TimeDuration;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+@Value
+@Builder
+public class EmailServiceSettings implements Serializable
+{
+    private static final long serialVersionUID = 0L;
+
+    private final TimeDuration connectionSendItemDuration;
+    private final TimeDuration queueRetryTimeout;
+    private final TimeDuration queueDiscardAge;
+    private final int connectionSendItemLimit;
+    private final int maxThreads;
+    private final int queueMaxItems;
+    private final Set<Integer> retryableStatusResponses;
+
+
+    static EmailServiceSettings fromConfiguration( final Configuration configuration )
+    {
+        return builder()
+                .maxThreads( Integer.parseInt( configuration.readAppProperty( AppProperty.QUEUE_EMAIL_MAX_THREADS ) ) )
+                .connectionSendItemDuration( TimeDuration.of(
+                        Integer.parseInt( configuration.readAppProperty( AppProperty.QUEUE_EMAIL_MAX_SECONDS_PER_CONNECTION ) ),
+                        TimeDuration.Unit.SECONDS ) )
+                .connectionSendItemLimit( Integer.parseInt( configuration.readAppProperty( AppProperty.QUEUE_EMAIL_MAX_ITEMS_PER_CONNECTION ) ) )
+                .queueRetryTimeout( TimeDuration.of(
+                        Long.parseLong( configuration.readAppProperty( AppProperty.QUEUE_EMAIL_RETRY_TIMEOUT_MS ) ),
+                        TimeDuration.Unit.MILLISECONDS )
+                )
+                .queueDiscardAge( TimeDuration.of( configuration.readSettingAsLong( PwmSetting.EMAIL_MAX_QUEUE_AGE ), TimeDuration.Unit.SECONDS ) )
+                .queueMaxItems( Integer.parseInt( configuration.readAppProperty( AppProperty.QUEUE_EMAIL_MAX_COUNT ) ) )
+                .retryableStatusResponses( readRetryableStatusCodes( configuration ) )
+                .build();
+    }
+
+    private static Set<Integer> readRetryableStatusCodes( final Configuration configuration )
+    {
+        final String rawAppProp = configuration.readAppProperty( AppProperty.SMTP_RETRYABLE_SEND_RESPONSE_STATUSES );
+        if ( StringUtil.isEmpty( rawAppProp ) )
+        {
+            return Collections.emptySet();
+        }
+
+        final Set<Integer> returnData = new HashSet<>();
+        for ( final String loopString : rawAppProp.split( "," ) )
+        {
+            final Integer loopInt = Integer.parseInt( loopString );
+            returnData.add( loopInt );
+        }
+        return Collections.unmodifiableSet( returnData );
+    }
+
+
+}

+ 1 - 1
server/src/main/java/password/pwm/svc/wordlist/WordlistImporter.java

@@ -67,7 +67,7 @@ class WordlistImporter implements Runnable
     private ErrorInformation exitError;
     private ErrorInformation exitError;
     private Instant startTime = Instant.now();
     private Instant startTime = Instant.now();
     private long bytesSkipped;
     private long bytesSkipped;
-    private Map<WordType, Long> seenWordTypes = new HashMap<>();
+    private final Map<WordType, Long> seenWordTypes = new HashMap<>();
     private boolean completed;
     private boolean completed;
 
 
     private enum DebugKey
     private enum DebugKey

+ 62 - 0
server/src/main/java/password/pwm/util/java/StatisticIntCounterMap.java

@@ -0,0 +1,62 @@
+/*
+ * Password Management Servlets (PWM)
+ * http://www.pwm-project.org
+ *
+ * Copyright (c) 2006-2009 Novell, Inc.
+ * Copyright (c) 2009-2020 The PWM Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package password.pwm.util.java;
+
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class StatisticIntCounterMap<K extends Enum<K>>
+{
+    private final Map<K, AtomicLoopIntIncrementer> statMap;
+
+    public StatisticIntCounterMap( final Class<K> keyType )
+    {
+        final EnumMap<K, AtomicLoopIntIncrementer> enumMap = new EnumMap<>( keyType );
+        for ( final K loopStatEnum : EnumSet.allOf( keyType ) )
+        {
+            enumMap.put( loopStatEnum, new AtomicLoopIntIncrementer() );
+        }
+        statMap = Collections.unmodifiableMap( enumMap );
+    }
+
+    public void increment( final K stat )
+    {
+        statMap.get( stat ).next();
+    }
+
+    public int get( final K stat )
+    {
+        return statMap.get( stat ).get();
+    }
+
+    public Map<String, String> debugStats()
+    {
+        return Collections.unmodifiableMap( statMap.entrySet()
+                .stream()
+                .collect( Collectors.toMap(
+                        ( entry ) -> entry.getKey().name(),
+                        ( entry ) -> String.valueOf( entry.getValue().get() ) ) ) );
+    }
+
+}

+ 24 - 17
server/src/main/java/password/pwm/util/localdb/WorkQueueProcessor.java

@@ -33,6 +33,7 @@ import password.pwm.util.java.AtomicLoopIntIncrementer;
 import password.pwm.util.java.JavaHelper;
 import password.pwm.util.java.JavaHelper;
 import password.pwm.util.java.JsonUtil;
 import password.pwm.util.java.JsonUtil;
 import password.pwm.util.java.MovingAverage;
 import password.pwm.util.java.MovingAverage;
+import password.pwm.util.java.StatisticIntCounterMap;
 import password.pwm.util.java.StringUtil;
 import password.pwm.util.java.StringUtil;
 import password.pwm.util.java.TimeDuration;
 import password.pwm.util.java.TimeDuration;
 import password.pwm.util.logging.PwmLogger;
 import password.pwm.util.logging.PwmLogger;
@@ -51,7 +52,6 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.LockSupport;
 import java.util.concurrent.locks.LockSupport;
 import java.util.function.Supplier;
 import java.util.function.Supplier;
 
 
@@ -76,13 +76,18 @@ public final class WorkQueueProcessor<W extends Serializable>
 
 
     private ThreadPoolExecutor executorService;
     private ThreadPoolExecutor executorService;
 
 
-    private final MovingAverage avgLagTime = new MovingAverage( TimeDuration.HOUR );
-    private final EventRateMeter sendRate = new EventRateMeter( TimeDuration.HOUR );
+    private final MovingAverage avgLagTime = new MovingAverage( TimeDuration.MINUTE );
+    private final EventRateMeter sendRate = new EventRateMeter( TimeDuration.MINUTE );
 
 
-    private final AtomicInteger preQueueSubmit = new AtomicInteger( 0 );
-    private final AtomicInteger preQueueBypass = new AtomicInteger( 0 );
-    private final AtomicInteger preQueueFallback = new AtomicInteger( 0 );
-    private final AtomicInteger queueProcessItems = new AtomicInteger( 0 );
+    private final StatisticIntCounterMap<WorkQueueStat> workQueueStats = new StatisticIntCounterMap<>( WorkQueueStat.class );
+
+    enum WorkQueueStat
+    {
+        preQueueSubmit,
+        preQueueBypass,
+        preQueueFallback,
+        queueProcessItems,
+    }
 
 
     public enum ProcessResult
     public enum ProcessResult
     {
     {
@@ -124,9 +129,10 @@ public final class WorkQueueProcessor<W extends Serializable>
                     settings.getPreThreads(),
                     settings.getPreThreads(),
                     1,
                     1,
                     TimeUnit.MINUTES,
                     TimeUnit.MINUTES,
-                    new ArrayBlockingQueue<>( settings.getPreThreads() ),
+                    new ArrayBlockingQueue<>( settings.getPreThreads() + 1 ),
                     threadFactory
                     threadFactory
             );
             );
+            executorService.getActiveCount();
             executorService.allowCoreThreadTimeOut( true );
             executorService.allowCoreThreadTimeOut( true );
         }
         }
     }
     }
@@ -190,12 +196,12 @@ public final class WorkQueueProcessor<W extends Serializable>
             try
             try
             {
             {
                 executorService.execute( ( ) -> sendAndQueueIfNecessary( itemWrapper ) );
                 executorService.execute( ( ) -> sendAndQueueIfNecessary( itemWrapper ) );
-                preQueueSubmit.incrementAndGet();
+                workQueueStats.increment( WorkQueueStat.preQueueSubmit );
             }
             }
             catch ( final RejectedExecutionException e )
             catch ( final RejectedExecutionException e )
             {
             {
                 submitToQueue( itemWrapper );
                 submitToQueue( itemWrapper );
-                preQueueBypass.incrementAndGet();
+                workQueueStats.increment( WorkQueueStat.preQueueBypass );
             }
             }
         }
         }
         else
         else
@@ -216,7 +222,7 @@ public final class WorkQueueProcessor<W extends Serializable>
             }
             }
             else if ( processResult == ProcessResult.RETRY || processResult == ProcessResult.NOOP )
             else if ( processResult == ProcessResult.RETRY || processResult == ProcessResult.NOOP )
             {
             {
-                preQueueFallback.incrementAndGet();
+                workQueueStats.increment( WorkQueueStat.preQueueFallback );
                 try
                 try
                 {
                 {
                     submitToQueue( itemWrapper );
                     submitToQueue( itemWrapper );
@@ -430,7 +436,7 @@ public final class WorkQueueProcessor<W extends Serializable>
             final ProcessResult processResult;
             final ProcessResult processResult;
             try
             try
             {
             {
-                queueProcessItems.incrementAndGet();
+                workQueueStats.increment( WorkQueueStat.queueProcessItems );
                 processResult = itemProcessor.process( itemWrapper.getWorkItem() );
                 processResult = itemProcessor.process( itemWrapper.getWorkItem() );
                 if ( processResult == null )
                 if ( processResult == null )
                 {
                 {
@@ -593,14 +599,15 @@ public final class WorkQueueProcessor<W extends Serializable>
         final Map<String, String> output = new HashMap<>();
         final Map<String, String> output = new HashMap<>();
         output.put( "avgLagTime", TimeDuration.of( ( long ) avgLagTime.getAverage(), TimeDuration.Unit.MILLISECONDS ).asCompactString() );
         output.put( "avgLagTime", TimeDuration.of( ( long ) avgLagTime.getAverage(), TimeDuration.Unit.MILLISECONDS ).asCompactString() );
         output.put( "sendRate", sendRate.readEventRate().setScale( 2, RoundingMode.DOWN ) + "/s" );
         output.put( "sendRate", sendRate.readEventRate().setScale( 2, RoundingMode.DOWN ) + "/s" );
-        output.put( "preQueueSubmit", String.valueOf( preQueueSubmit.get() ) );
-        output.put( "preQueueBypass", String.valueOf( preQueueBypass.get() ) );
-        output.put( "preQueueFallback", String.valueOf( preQueueFallback.get() ) );
-        output.put( "queueProcessItems", String.valueOf( queueProcessItems.get() ) );
         if ( executorService != null )
         if ( executorService != null )
         {
         {
-            output.put( "activeThreads", String.valueOf( executorService.getActiveCount() ) );
+            output.put( "preQueueThreads", String.valueOf( executorService.getActiveCount() ) );
+        }
+        if ( workerThread != null )
+        {
+            output.put( "postQueueThreads", workerThread.isRunning() ? "1" : "0" );
         }
         }
+        output.putAll( workQueueStats.debugStats() );
         return Collections.unmodifiableMap( output );
         return Collections.unmodifiableMap( output );
     }
     }
 }
 }

+ 3 - 1
server/src/main/resources/password/pwm/AppProperty.properties

@@ -282,7 +282,9 @@ pwNotify.maxLdapSearchSize=1000000
 pwNotify.maxSkipRerunWindowSeconds=86400
 pwNotify.maxSkipRerunWindowSeconds=86400
 queue.email.retryTimeoutMs=10000
 queue.email.retryTimeoutMs=10000
 queue.email.maxCount=100000
 queue.email.maxCount=100000
-queue.email.maxThreads=0
+queue.email.maxThreads=10
+queue.email.maxItemsPerConnection=10000
+queue.email.maxSecondsPerConnection=120
 queue.sms.retryTimeoutMs=10000
 queue.sms.retryTimeoutMs=10000
 queue.sms.maxCount=100000
 queue.sms.maxCount=100000
 queue.syslog.retryTimeoutMs=30000
 queue.syslog.retryTimeoutMs=30000

+ 1 - 0
server/src/main/resources/password/pwm/i18n/Health.properties

@@ -34,6 +34,7 @@ HealthMessage_LDAP_TestUserOK=LDAP test user account is functioning normally for
 HealthMessage_LDAP_AD_Unsecure=%1% is not configured as a secure connection.  Active Directory requires a secure connection to allow password changes.
 HealthMessage_LDAP_AD_Unsecure=%1% is not configured as a secure connection.  Active Directory requires a secure connection to allow password changes.
 HealthMessage_LDAP_AD_StaticIP=%1% should be configured using a dns hostname instead of an IP address.  Active Directory can sometimes have errors when using an IP address for configuration.
 HealthMessage_LDAP_AD_StaticIP=%1% should be configured using a dns hostname instead of an IP address.  Active Directory can sometimes have errors when using an IP address for configuration.
 HealthMessage_Email_SendFailure=Unable to send email due to error: %1%
 HealthMessage_Email_SendFailure=Unable to send email due to error: %1%
+HealthMessage_Email_ConnectFailure=Unable to connect to SMTP email profile '%1%', error: %2%
 HealthMessage_PwNotify_Failure=Error while sending password notification emails: %1%
 HealthMessage_PwNotify_Failure=Error while sending password notification emails: %1%
 HealthMessage_MissingResource=missing resource: bundle=%1%, locale=%2%, key=%3%
 HealthMessage_MissingResource=missing resource: bundle=%1%, locale=%2%, key=%3%
 HealthMessage_BrokenMethod=broken method invocation for '%1%', error: %2%
 HealthMessage_BrokenMethod=broken method invocation for '%1%', error: %2%