|
@@ -35,7 +35,6 @@ import password.pwm.error.PwmUnrecoverableException;
|
|
|
import password.pwm.ldap.LdapOperationsHelper;
|
|
|
import password.pwm.ldap.UserInfo;
|
|
|
import password.pwm.ldap.UserInfoFactory;
|
|
|
-import password.pwm.svc.stats.EventRateMeter;
|
|
|
import password.pwm.svc.stats.Statistic;
|
|
|
import password.pwm.svc.stats.StatisticsManager;
|
|
|
import password.pwm.util.LocaleHelper;
|
|
@@ -47,16 +46,18 @@ import password.pwm.util.macro.MacroMachine;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.io.Writer;
|
|
|
-import java.math.BigDecimal;
|
|
|
import java.time.Duration;
|
|
|
import java.time.Instant;
|
|
|
import java.time.temporal.ChronoUnit;
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.Collection;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.List;
|
|
|
import java.util.Locale;
|
|
|
+import java.util.concurrent.LinkedBlockingDeque;
|
|
|
+import java.util.concurrent.ThreadFactory;
|
|
|
+import java.util.concurrent.ThreadPoolExecutor;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
+import java.util.function.Supplier;
|
|
|
|
|
|
public class PwNotifyEngine
|
|
|
{
|
|
@@ -69,14 +70,14 @@ public class PwNotifyEngine
|
|
|
private final Writer debugWriter;
|
|
|
private final StringBuffer internalLog = new StringBuffer( );
|
|
|
private final List<UserPermission> permissionList;
|
|
|
+ private final PwNotifyStorageService storageService;
|
|
|
+ private final Supplier<Boolean> cancelFlag;
|
|
|
|
|
|
private final ConditionalTaskExecutor debugOutputTask = new ConditionalTaskExecutor(
|
|
|
this::periodicDebugOutput,
|
|
|
new ConditionalTaskExecutor.TimeDurationPredicate( 1, TimeDuration.Unit.MINUTES )
|
|
|
);
|
|
|
|
|
|
- private EventRateMeter eventRateMeter = new EventRateMeter( TimeDuration.of( 5, TimeDuration.Unit.MINUTES ) );
|
|
|
-
|
|
|
private final AtomicInteger examinedCount = new AtomicInteger( 0 );
|
|
|
private final AtomicInteger noticeCount = new AtomicInteger( 0 );
|
|
|
private Instant startTime;
|
|
@@ -85,10 +86,14 @@ public class PwNotifyEngine
|
|
|
|
|
|
PwNotifyEngine(
|
|
|
final PwmApplication pwmApplication,
|
|
|
+ final PwNotifyStorageService storageService,
|
|
|
+ final Supplier<Boolean> cancelFlag,
|
|
|
final Writer debugWriter
|
|
|
)
|
|
|
{
|
|
|
this.pwmApplication = pwmApplication;
|
|
|
+ this.cancelFlag = cancelFlag;
|
|
|
+ this.storageService = storageService;
|
|
|
this.settings = PwNotifySettings.fromConfiguration( pwmApplication.getConfig() );
|
|
|
this.debugWriter = debugWriter;
|
|
|
this.permissionList = pwmApplication.getConfig().readSettingAsUserPermission( PwmSetting.PW_EXPY_NOTIFY_PERMISSION );
|
|
@@ -133,7 +138,7 @@ public class PwNotifyEngine
|
|
|
internalLog.delete( 0, internalLog.length() );
|
|
|
running = true;
|
|
|
|
|
|
- if ( !canRunOnThisServer() )
|
|
|
+ if ( !canRunOnThisServer() || cancelFlag.get() )
|
|
|
{
|
|
|
return;
|
|
|
}
|
|
@@ -156,36 +161,22 @@ public class PwNotifyEngine
|
|
|
);
|
|
|
|
|
|
log( "ldap search complete, examining users..." );
|
|
|
+
|
|
|
+ final ThreadPoolExecutor threadPoolExecutor = createExecutor( pwmApplication );
|
|
|
while ( workQueue.hasNext() )
|
|
|
{
|
|
|
- if ( !checkIfRunningOnMaster() )
|
|
|
+ if ( !checkIfRunningOnMaster() || cancelFlag.get() )
|
|
|
{
|
|
|
final String msg = "job interrupted, server is no longer the cluster master.";
|
|
|
log( msg );
|
|
|
throw PwmUnrecoverableException.newException( PwmError.ERROR_SERVICE_NOT_AVAILABLE, msg );
|
|
|
}
|
|
|
|
|
|
- checkIfRunningOnMaster( );
|
|
|
-
|
|
|
- final List<UserIdentity> batch = new ArrayList<>( );
|
|
|
- final int batchSize = settings.getBatchCount();
|
|
|
-
|
|
|
- while ( batch.size() < batchSize && workQueue.hasNext() )
|
|
|
- {
|
|
|
- batch.add( workQueue.next() );
|
|
|
- }
|
|
|
+ threadPoolExecutor.submit( new ProcessJob( workQueue.next() ) );
|
|
|
+ }
|
|
|
|
|
|
- final Instant startBatch = Instant.now();
|
|
|
- processBatch( batch );
|
|
|
- eventRateMeter.markEvents( batchSize );
|
|
|
- final TimeDuration batchTime = TimeDuration.fromCurrent( startBatch );
|
|
|
- final TimeDuration pauseTime = TimeDuration.of(
|
|
|
- settings.getBatchTimeMultiplier().multiply( new BigDecimal( batchTime.asMillis() ) ).longValue(),
|
|
|
- TimeDuration.Unit.MILLISECONDS );
|
|
|
- pauseTime.pause();
|
|
|
+ JavaHelper.closeAndWaitExecutor( threadPoolExecutor, TimeDuration.DAY );
|
|
|
|
|
|
- debugOutputTask.conditionallyExecuteTask();
|
|
|
- }
|
|
|
log( "job complete, " + examinedCount + " users evaluated in " + TimeDuration.fromCurrent( startTime ).asCompactString()
|
|
|
+ ", sent " + noticeCount + " notices."
|
|
|
);
|
|
@@ -198,26 +189,46 @@ public class PwNotifyEngine
|
|
|
|
|
|
private void periodicDebugOutput()
|
|
|
{
|
|
|
- log( "job in progress, " + examinedCount + " users evaluated in "
|
|
|
+ final String msg = "job in progress, " + examinedCount + " users evaluated in "
|
|
|
+ TimeDuration.fromCurrent( startTime ).asCompactString()
|
|
|
- + ", sent " + noticeCount + " notices."
|
|
|
- );
|
|
|
+ + ", sent " + noticeCount + " notices.";
|
|
|
+ log( msg );
|
|
|
}
|
|
|
|
|
|
- private void processBatch( final Collection<UserIdentity> batch )
|
|
|
- throws PwmUnrecoverableException
|
|
|
+ private class ProcessJob implements Runnable
|
|
|
{
|
|
|
- for ( final UserIdentity userIdentity : batch )
|
|
|
+ final UserIdentity userIdentity;
|
|
|
+
|
|
|
+ ProcessJob( final UserIdentity userIdentity )
|
|
|
{
|
|
|
- processUserIdentity( userIdentity );
|
|
|
+ this.userIdentity = userIdentity;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void run()
|
|
|
+ {
|
|
|
+ try
|
|
|
+ {
|
|
|
+ processUserIdentity( userIdentity );
|
|
|
+ debugOutputTask.conditionallyExecuteTask();
|
|
|
+ }
|
|
|
+ catch ( Exception e )
|
|
|
+ {
|
|
|
+ LOGGER.trace( "unexpected error processing user '" + userIdentity.toDisplayString() + "', error: " + e.getMessage() );
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private boolean processUserIdentity(
|
|
|
+ private void processUserIdentity(
|
|
|
final UserIdentity userIdentity
|
|
|
)
|
|
|
throws PwmUnrecoverableException
|
|
|
{
|
|
|
+ if ( !canRunOnThisServer() || cancelFlag.get() )
|
|
|
+ {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
examinedCount.incrementAndGet();
|
|
|
final ChaiUser theUser = pwmApplication.getProxiedChaiUser( userIdentity );
|
|
|
final Instant passwordExpirationTime = LdapOperationsHelper.readPasswordExpirationTime( theUser );
|
|
@@ -225,36 +236,31 @@ public class PwNotifyEngine
|
|
|
if ( passwordExpirationTime == null )
|
|
|
{
|
|
|
LOGGER.trace( SESSION_LABEL, "skipping user '" + userIdentity.toDisplayString() + "', has no password expiration" );
|
|
|
- return false;
|
|
|
+ return;
|
|
|
}
|
|
|
|
|
|
if ( passwordExpirationTime.isBefore( Instant.now() ) )
|
|
|
{
|
|
|
LOGGER.trace( SESSION_LABEL, "skipping user '" + userIdentity.toDisplayString() + "', password expiration is in the past" );
|
|
|
- return false;
|
|
|
+ return;
|
|
|
}
|
|
|
|
|
|
final int nextDayInterval = figureNextDayInterval( passwordExpirationTime );
|
|
|
if ( nextDayInterval < 1 )
|
|
|
{
|
|
|
LOGGER.trace( SESSION_LABEL, "skipping user '" + userIdentity.toDisplayString() + "', password expiration time is not within an interval" );
|
|
|
- return false;
|
|
|
+ return;
|
|
|
}
|
|
|
|
|
|
if ( checkIfNoticeAlreadySent( userIdentity, passwordExpirationTime, nextDayInterval ) )
|
|
|
{
|
|
|
log( "notice for interval " + nextDayInterval + " already sent for " + userIdentity.toDisplayString() );
|
|
|
- return false;
|
|
|
+ return;
|
|
|
}
|
|
|
|
|
|
log( "sending notice to " + userIdentity.toDisplayString() + " for interval " + nextDayInterval );
|
|
|
- {
|
|
|
- final PwNotifyDbStorageService dbStorage = new PwNotifyDbStorageService( pwmApplication );
|
|
|
- dbStorage.writeStoredState( userIdentity, SESSION_LABEL, new StoredNotificationState( passwordExpirationTime, Instant.now(), nextDayInterval ) );
|
|
|
- }
|
|
|
-
|
|
|
+ storageService.writeStoredUserState( userIdentity, SESSION_LABEL, new StoredNotificationState( passwordExpirationTime, Instant.now(), nextDayInterval ) );
|
|
|
sendNoticeEmail( userIdentity );
|
|
|
- return true;
|
|
|
}
|
|
|
|
|
|
private int figureNextDayInterval(
|
|
@@ -288,8 +294,7 @@ public class PwNotifyEngine
|
|
|
)
|
|
|
throws PwmUnrecoverableException
|
|
|
{
|
|
|
- final PwNotifyDbStorageService dbStorage = new PwNotifyDbStorageService( pwmApplication );
|
|
|
- final StoredNotificationState storedState = dbStorage.readStoredState( userIdentity, SESSION_LABEL );
|
|
|
+ final StoredNotificationState storedState = storageService.readStoredUserState( userIdentity, SESSION_LABEL );
|
|
|
|
|
|
if ( storedState == null )
|
|
|
{
|
|
@@ -365,4 +370,20 @@ public class PwNotifyEngine
|
|
|
|
|
|
LOGGER.trace( SessionLabel.PWNOTIFY_SESSION_LABEL, output );
|
|
|
}
|
|
|
+
|
|
|
+ private ThreadPoolExecutor createExecutor( final PwmApplication pwmApplication )
|
|
|
+ {
|
|
|
+ final ThreadFactory threadFactory = JavaHelper.makePwmThreadFactory( JavaHelper.makeThreadName( pwmApplication, this.getClass() ), true );
|
|
|
+ final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
|
|
|
+ 1,
|
|
|
+ 10,
|
|
|
+ 1,
|
|
|
+ TimeUnit.MINUTES,
|
|
|
+ new LinkedBlockingDeque<>(),
|
|
|
+ threadFactory
|
|
|
+ );
|
|
|
+ threadPoolExecutor.allowCoreThreadTimeOut( true );
|
|
|
+ return threadPoolExecutor;
|
|
|
+ }
|
|
|
+
|
|
|
}
|