|
@@ -23,10 +23,7 @@
|
|
|
package password.pwm.svc.pwnotify;
|
|
|
|
|
|
import com.novell.ldapchai.ChaiUser;
|
|
|
-import com.novell.ldapchai.exception.ChaiOperationException;
|
|
|
-import com.novell.ldapchai.exception.ChaiUnavailableException;
|
|
|
import password.pwm.PwmApplication;
|
|
|
-import password.pwm.PwmConstants;
|
|
|
import password.pwm.bean.EmailItemBean;
|
|
|
import password.pwm.bean.SessionLabel;
|
|
|
import password.pwm.bean.UserIdentity;
|
|
@@ -36,12 +33,11 @@ import password.pwm.error.PwmError;
|
|
|
import password.pwm.error.PwmOperationalException;
|
|
|
import password.pwm.error.PwmUnrecoverableException;
|
|
|
import password.pwm.ldap.LdapOperationsHelper;
|
|
|
-import password.pwm.ldap.LdapPermissionTester;
|
|
|
import password.pwm.ldap.UserInfo;
|
|
|
import password.pwm.ldap.UserInfoFactory;
|
|
|
-import password.pwm.svc.stats.EventRateMeter;
|
|
|
import password.pwm.svc.stats.Statistic;
|
|
|
import password.pwm.svc.stats.StatisticsManager;
|
|
|
+import password.pwm.util.LocaleHelper;
|
|
|
import password.pwm.util.java.ConditionalTaskExecutor;
|
|
|
import password.pwm.util.java.JavaHelper;
|
|
|
import password.pwm.util.java.TimeDuration;
|
|
@@ -50,15 +46,18 @@ import password.pwm.util.macro.MacroMachine;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.io.Writer;
|
|
|
-import java.math.BigDecimal;
|
|
|
import java.time.Duration;
|
|
|
import java.time.Instant;
|
|
|
import java.time.temporal.ChronoUnit;
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.Collection;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.List;
|
|
|
import java.util.Locale;
|
|
|
+import java.util.concurrent.LinkedBlockingDeque;
|
|
|
+import java.util.concurrent.ThreadFactory;
|
|
|
+import java.util.concurrent.ThreadPoolExecutor;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
+import java.util.function.Supplier;
|
|
|
|
|
|
public class PwNotifyEngine
|
|
|
{
|
|
@@ -66,31 +65,37 @@ public class PwNotifyEngine
|
|
|
|
|
|
private static final SessionLabel SESSION_LABEL = SessionLabel.PW_EXP_NOTICE_LABEL;
|
|
|
|
|
|
+ private static final int MAX_LOG_SIZE = 1024 * 1024 * 1024;
|
|
|
+
|
|
|
private final PwNotifySettings settings;
|
|
|
private final PwmApplication pwmApplication;
|
|
|
private final Writer debugWriter;
|
|
|
private final StringBuffer internalLog = new StringBuffer( );
|
|
|
private final List<UserPermission> permissionList;
|
|
|
+ private final PwNotifyStorageService storageService;
|
|
|
+ private final Supplier<Boolean> cancelFlag;
|
|
|
|
|
|
private final ConditionalTaskExecutor debugOutputTask = new ConditionalTaskExecutor(
|
|
|
this::periodicDebugOutput,
|
|
|
new ConditionalTaskExecutor.TimeDurationPredicate( 1, TimeDuration.Unit.MINUTES )
|
|
|
);
|
|
|
|
|
|
- private EventRateMeter eventRateMeter = new EventRateMeter( TimeDuration.of( 5, TimeDuration.Unit.MINUTES ) );
|
|
|
-
|
|
|
- private int examinedCount = 0;
|
|
|
- private int noticeCount = 0;
|
|
|
+ private final AtomicInteger examinedCount = new AtomicInteger( 0 );
|
|
|
+ private final AtomicInteger noticeCount = new AtomicInteger( 0 );
|
|
|
private Instant startTime;
|
|
|
|
|
|
private volatile boolean running;
|
|
|
|
|
|
PwNotifyEngine(
|
|
|
final PwmApplication pwmApplication,
|
|
|
+ final PwNotifyStorageService storageService,
|
|
|
+ final Supplier<Boolean> cancelFlag,
|
|
|
final Writer debugWriter
|
|
|
)
|
|
|
{
|
|
|
this.pwmApplication = pwmApplication;
|
|
|
+ this.cancelFlag = cancelFlag;
|
|
|
+ this.storageService = storageService;
|
|
|
this.settings = PwNotifySettings.fromConfiguration( pwmApplication.getConfig() );
|
|
|
this.debugWriter = debugWriter;
|
|
|
this.permissionList = pwmApplication.getConfig().readSettingAsUserPermission( PwmSetting.PW_EXPY_NOTIFY_PERMISSION );
|
|
@@ -125,17 +130,17 @@ public class PwNotifyEngine
|
|
|
}
|
|
|
|
|
|
void executeJob( )
|
|
|
- throws ChaiUnavailableException, ChaiOperationException, PwmOperationalException, PwmUnrecoverableException
|
|
|
+ throws PwmOperationalException, PwmUnrecoverableException
|
|
|
{
|
|
|
startTime = Instant.now();
|
|
|
- examinedCount = 0;
|
|
|
- noticeCount = 0;
|
|
|
+ examinedCount.set( 0 );
|
|
|
+ noticeCount.set( 0 );
|
|
|
try
|
|
|
{
|
|
|
internalLog.delete( 0, internalLog.length() );
|
|
|
running = true;
|
|
|
|
|
|
- if ( !canRunOnThisServer() )
|
|
|
+ if ( !canRunOnThisServer() || cancelFlag.get() )
|
|
|
{
|
|
|
return;
|
|
|
}
|
|
@@ -150,46 +155,30 @@ public class PwNotifyEngine
|
|
|
}
|
|
|
|
|
|
log( "starting job, beginning ldap search" );
|
|
|
- final Iterator<UserIdentity> workQueue = LdapOperationsHelper.readAllUsersFromLdap(
|
|
|
+ final Iterator<UserIdentity> workQueue = LdapOperationsHelper.readUsersFromLdapForPermissions(
|
|
|
pwmApplication,
|
|
|
- null,
|
|
|
- null,
|
|
|
+ SESSION_LABEL,
|
|
|
+ permissionList,
|
|
|
settings.getMaxLdapSearchSize()
|
|
|
);
|
|
|
|
|
|
log( "ldap search complete, examining users..." );
|
|
|
+
|
|
|
+ final ThreadPoolExecutor threadPoolExecutor = createExecutor( pwmApplication );
|
|
|
while ( workQueue.hasNext() )
|
|
|
{
|
|
|
- if ( !checkIfRunningOnMaster() )
|
|
|
+ if ( !checkIfRunningOnMaster() || cancelFlag.get() )
|
|
|
{
|
|
|
final String msg = "job interrupted, server is no longer the cluster master.";
|
|
|
log( msg );
|
|
|
throw PwmUnrecoverableException.newException( PwmError.ERROR_SERVICE_NOT_AVAILABLE, msg );
|
|
|
}
|
|
|
|
|
|
- checkIfRunningOnMaster( );
|
|
|
- examinedCount++;
|
|
|
-
|
|
|
- final List<UserIdentity> batch = new ArrayList<>( );
|
|
|
- final int batchSize = settings.getBatchCount();
|
|
|
-
|
|
|
- while ( batch.size() < batchSize && workQueue.hasNext() )
|
|
|
- {
|
|
|
- batch.add( workQueue.next() );
|
|
|
- }
|
|
|
+ threadPoolExecutor.submit( new ProcessJob( workQueue.next() ) );
|
|
|
+ }
|
|
|
|
|
|
- final Instant startBatch = Instant.now();
|
|
|
- examinedCount += batch.size();
|
|
|
- noticeCount += processBatch( batch );
|
|
|
- eventRateMeter.markEvents( batchSize );
|
|
|
- final TimeDuration batchTime = TimeDuration.fromCurrent( startBatch );
|
|
|
- final TimeDuration pauseTime = TimeDuration.of(
|
|
|
- settings.getBatchTimeMultiplier().multiply( new BigDecimal( batchTime.asMillis() ) ).longValue(),
|
|
|
- TimeDuration.Unit.MILLISECONDS );
|
|
|
- pauseTime.pause();
|
|
|
+ JavaHelper.closeAndWaitExecutor( threadPoolExecutor, TimeDuration.DAY );
|
|
|
|
|
|
- debugOutputTask.conditionallyExecuteTask();
|
|
|
- }
|
|
|
log( "job complete, " + examinedCount + " users evaluated in " + TimeDuration.fromCurrent( startTime ).asCompactString()
|
|
|
+ ", sent " + noticeCount + " notices."
|
|
|
);
|
|
@@ -202,63 +191,78 @@ public class PwNotifyEngine
|
|
|
|
|
|
private void periodicDebugOutput()
|
|
|
{
|
|
|
- log( "job in progress, " + examinedCount + " users evaluated in " + TimeDuration.fromCurrent( startTime ).asCompactString()
|
|
|
- + ", sent " + noticeCount + " notices."
|
|
|
- );
|
|
|
+ final String msg = "job in progress, " + examinedCount + " users evaluated in "
|
|
|
+ + TimeDuration.fromCurrent( startTime ).asCompactString()
|
|
|
+ + ", sent " + noticeCount + " notices.";
|
|
|
+ log( msg );
|
|
|
}
|
|
|
|
|
|
- private int processBatch( final Collection<UserIdentity> batch )
|
|
|
- throws PwmUnrecoverableException
|
|
|
+ private class ProcessJob implements Runnable
|
|
|
{
|
|
|
- int count = 0;
|
|
|
- for ( final UserIdentity userIdentity : batch )
|
|
|
+ final UserIdentity userIdentity;
|
|
|
+
|
|
|
+ ProcessJob( final UserIdentity userIdentity )
|
|
|
+ {
|
|
|
+ this.userIdentity = userIdentity;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void run()
|
|
|
{
|
|
|
- if ( processUserIdentity( userIdentity ) )
|
|
|
+ try
|
|
|
{
|
|
|
- count++;
|
|
|
+ processUserIdentity( userIdentity );
|
|
|
+ debugOutputTask.conditionallyExecuteTask();
|
|
|
+ }
|
|
|
+ catch ( Exception e )
|
|
|
+ {
|
|
|
+ LOGGER.trace( "unexpected error processing user '" + userIdentity.toDisplayString() + "', error: " + e.getMessage() );
|
|
|
}
|
|
|
}
|
|
|
- return count;
|
|
|
}
|
|
|
|
|
|
- private boolean processUserIdentity(
|
|
|
+ private void processUserIdentity(
|
|
|
final UserIdentity userIdentity
|
|
|
)
|
|
|
throws PwmUnrecoverableException
|
|
|
{
|
|
|
- if ( !LdapPermissionTester.testUserPermissions( pwmApplication, SessionLabel.SYSTEM_LABEL, userIdentity, permissionList ) )
|
|
|
+ if ( !canRunOnThisServer() || cancelFlag.get() )
|
|
|
{
|
|
|
- return false;
|
|
|
+ return;
|
|
|
}
|
|
|
|
|
|
+ examinedCount.incrementAndGet();
|
|
|
final ChaiUser theUser = pwmApplication.getProxiedChaiUser( userIdentity );
|
|
|
final Instant passwordExpirationTime = LdapOperationsHelper.readPasswordExpirationTime( theUser );
|
|
|
|
|
|
- if ( passwordExpirationTime == null || passwordExpirationTime.isBefore( Instant.now() ) )
|
|
|
+ if ( passwordExpirationTime == null )
|
|
|
{
|
|
|
- return false;
|
|
|
+ LOGGER.trace( SESSION_LABEL, "skipping user '" + userIdentity.toDisplayString() + "', has no password expiration" );
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ if ( passwordExpirationTime.isBefore( Instant.now() ) )
|
|
|
+ {
|
|
|
+ LOGGER.trace( SESSION_LABEL, "skipping user '" + userIdentity.toDisplayString() + "', password expiration is in the past" );
|
|
|
+ return;
|
|
|
}
|
|
|
|
|
|
final int nextDayInterval = figureNextDayInterval( passwordExpirationTime );
|
|
|
if ( nextDayInterval < 1 )
|
|
|
{
|
|
|
- return false;
|
|
|
+ LOGGER.trace( SESSION_LABEL, "skipping user '" + userIdentity.toDisplayString() + "', password expiration time is not within an interval" );
|
|
|
+ return;
|
|
|
}
|
|
|
|
|
|
if ( checkIfNoticeAlreadySent( userIdentity, passwordExpirationTime, nextDayInterval ) )
|
|
|
{
|
|
|
log( "notice for interval " + nextDayInterval + " already sent for " + userIdentity.toDisplayString() );
|
|
|
- return false;
|
|
|
+ return;
|
|
|
}
|
|
|
|
|
|
log( "sending notice to " + userIdentity.toDisplayString() + " for interval " + nextDayInterval );
|
|
|
- {
|
|
|
- final PwNotifyDbStorageService dbStorage = new PwNotifyDbStorageService( pwmApplication );
|
|
|
- dbStorage.writeStoredState( userIdentity, SESSION_LABEL, new StoredNotificationState( passwordExpirationTime, Instant.now(), nextDayInterval ) );
|
|
|
- }
|
|
|
-
|
|
|
+ storageService.writeStoredUserState( userIdentity, SESSION_LABEL, new StoredNotificationState( passwordExpirationTime, Instant.now(), nextDayInterval ) );
|
|
|
sendNoticeEmail( userIdentity );
|
|
|
- return true;
|
|
|
}
|
|
|
|
|
|
private int figureNextDayInterval(
|
|
@@ -292,8 +296,7 @@ public class PwNotifyEngine
|
|
|
)
|
|
|
throws PwmUnrecoverableException
|
|
|
{
|
|
|
- final PwNotifyDbStorageService dbStorage = new PwNotifyDbStorageService( pwmApplication );
|
|
|
- final StoredNotificationState storedState = dbStorage.readStoredState( userIdentity, SESSION_LABEL );
|
|
|
+ final StoredNotificationState storedState = storageService.readStoredUserState( userIdentity, SESSION_LABEL );
|
|
|
|
|
|
if ( storedState == null )
|
|
|
{
|
|
@@ -316,18 +319,19 @@ public class PwNotifyEngine
|
|
|
private void sendNoticeEmail( final UserIdentity userIdentity )
|
|
|
throws PwmUnrecoverableException
|
|
|
{
|
|
|
- final Locale userLocale = PwmConstants.DEFAULT_LOCALE;
|
|
|
- final EmailItemBean emailItemBean = pwmApplication.getConfig().readSettingAsEmail(
|
|
|
- PwmSetting.EMAIL_PW_EXPIRATION_NOTICE,
|
|
|
- userLocale
|
|
|
- );
|
|
|
- final MacroMachine macroMachine = MacroMachine.forUser( pwmApplication, userLocale, SESSION_LABEL, userIdentity );
|
|
|
- final UserInfo userInfoBean = UserInfoFactory.newUserInfoUsingProxy(
|
|
|
+ final UserInfo userInfoBean = UserInfoFactory.newUserInfoUsingProxyForOfflineUser(
|
|
|
pwmApplication,
|
|
|
SESSION_LABEL,
|
|
|
- userIdentity, userLocale
|
|
|
+ userIdentity
|
|
|
+ );
|
|
|
+ final Locale ldapLocale = LocaleHelper.parseLocaleString( userInfoBean.getLanguage() );
|
|
|
+ final MacroMachine macroMachine = MacroMachine.forUser( pwmApplication, ldapLocale, SESSION_LABEL, userIdentity );
|
|
|
+ final EmailItemBean emailItemBean = pwmApplication.getConfig().readSettingAsEmail(
|
|
|
+ PwmSetting.EMAIL_PW_EXPIRATION_NOTICE,
|
|
|
+ ldapLocale
|
|
|
);
|
|
|
|
|
|
+ noticeCount.incrementAndGet();
|
|
|
StatisticsManager.incrementStat( pwmApplication, Statistic.PWNOTIFY_EMAILS_SENT );
|
|
|
pwmApplication.getEmailQueue().submitEmail( emailItemBean, userInfoBean, macroMachine );
|
|
|
}
|
|
@@ -353,7 +357,7 @@ public class PwNotifyEngine
|
|
|
}
|
|
|
|
|
|
internalLog.append( msg );
|
|
|
- while ( internalLog.length() > 1024 * 1024 * 1024 )
|
|
|
+ while ( internalLog.length() > MAX_LOG_SIZE )
|
|
|
{
|
|
|
final int nextLf = internalLog.indexOf( "\n" );
|
|
|
if ( nextLf > 0 )
|
|
@@ -368,4 +372,19 @@ public class PwNotifyEngine
|
|
|
|
|
|
LOGGER.trace( SessionLabel.PWNOTIFY_SESSION_LABEL, output );
|
|
|
}
|
|
|
+
|
|
|
+ private ThreadPoolExecutor createExecutor( final PwmApplication pwmApplication )
|
|
|
+ {
|
|
|
+ final ThreadFactory threadFactory = JavaHelper.makePwmThreadFactory( JavaHelper.makeThreadName( pwmApplication, this.getClass() ), true );
|
|
|
+ final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
|
|
|
+ 1,
|
|
|
+ 10,
|
|
|
+ 1,
|
|
|
+ TimeUnit.MINUTES,
|
|
|
+ new LinkedBlockingDeque<>(),
|
|
|
+ threadFactory
|
|
|
+ );
|
|
|
+ threadPoolExecutor.allowCoreThreadTimeOut( true );
|
|
|
+ return threadPoolExecutor;
|
|
|
+ }
|
|
|
}
|