|
@@ -20,11 +20,10 @@
|
|
|
|
|
|
package password.pwm.util;
|
|
package password.pwm.util;
|
|
|
|
|
|
-import org.jetbrains.annotations.NotNull;
|
|
|
|
import password.pwm.PwmApplication;
|
|
import password.pwm.PwmApplication;
|
|
import password.pwm.PwmConstants;
|
|
import password.pwm.PwmConstants;
|
|
|
|
+import password.pwm.bean.SessionLabel;
|
|
import password.pwm.error.PwmError;
|
|
import password.pwm.error.PwmError;
|
|
-import password.pwm.error.PwmInternalException;
|
|
|
|
import password.pwm.error.PwmUnrecoverableException;
|
|
import password.pwm.error.PwmUnrecoverableException;
|
|
import password.pwm.util.java.AtomicLoopIntIncrementer;
|
|
import password.pwm.util.java.AtomicLoopIntIncrementer;
|
|
import password.pwm.util.java.StringUtil;
|
|
import password.pwm.util.java.StringUtil;
|
|
@@ -46,85 +45,58 @@ import java.util.concurrent.Executors;
|
|
import java.util.concurrent.Future;
|
|
import java.util.concurrent.Future;
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
|
|
+import java.util.concurrent.ScheduledFuture;
|
|
|
|
+import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
import java.util.concurrent.ThreadFactory;
|
|
import java.util.concurrent.ThreadFactory;
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
|
|
public class PwmScheduler
|
|
public class PwmScheduler
|
|
{
|
|
{
|
|
private static final PwmLogger LOGGER = PwmLogger.forClass( PwmScheduler.class );
|
|
private static final PwmLogger LOGGER = PwmLogger.forClass( PwmScheduler.class );
|
|
private static final AtomicLoopIntIncrementer THREAD_ID_COUNTER = new AtomicLoopIntIncrementer();
|
|
private static final AtomicLoopIntIncrementer THREAD_ID_COUNTER = new AtomicLoopIntIncrementer();
|
|
|
|
|
|
- private final ScheduledExecutorService applicationExecutorService;
|
|
|
|
private final PwmApplication pwmApplication;
|
|
private final PwmApplication pwmApplication;
|
|
|
|
|
|
public PwmScheduler( final PwmApplication pwmApplication )
|
|
public PwmScheduler( final PwmApplication pwmApplication )
|
|
{
|
|
{
|
|
this.pwmApplication = Objects.requireNonNull( pwmApplication );
|
|
this.pwmApplication = Objects.requireNonNull( pwmApplication );
|
|
- applicationExecutorService = makeSingleThreadExecutorService( pwmApplication.getInstanceID(), this.getClass() );
|
|
|
|
}
|
|
}
|
|
|
|
|
|
public void shutdown()
|
|
public void shutdown()
|
|
{
|
|
{
|
|
- applicationExecutorService.shutdown();
|
|
|
|
}
|
|
}
|
|
|
|
|
|
public void immediateExecuteRunnableInNewThread(
|
|
public void immediateExecuteRunnableInNewThread(
|
|
final Runnable runnable,
|
|
final Runnable runnable,
|
|
- final String threadName
|
|
|
|
- )
|
|
|
|
- {
|
|
|
|
- immediateExecuteCallableInNewThread( Executors.callable( runnable ), threadName );
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public <V> Future<V> immediateExecuteCallableInNewThread(
|
|
|
|
- final Callable<V> callable,
|
|
|
|
|
|
+ final SessionLabel sessionLabel,
|
|
final String threadName
|
|
final String threadName
|
|
)
|
|
)
|
|
{
|
|
{
|
|
checkIfSchedulerClosed();
|
|
checkIfSchedulerClosed();
|
|
|
|
|
|
- Objects.requireNonNull( callable );
|
|
|
|
-
|
|
|
|
- final String name = "runtime thread #" + THREAD_ID_COUNTER.next() + " " + threadName;
|
|
|
|
-
|
|
|
|
- final ScheduledExecutorService executor = makeSingleThreadExecutorService( pwmApplication.getInstanceID(), callable.getClass() );
|
|
|
|
|
|
+ Objects.requireNonNull( runnable );
|
|
|
|
|
|
- final Callable<V> runnableWrapper = () ->
|
|
|
|
- {
|
|
|
|
- final Instant itemStartTime = Instant.now();
|
|
|
|
- LOGGER.trace( () -> "started " + name );
|
|
|
|
- try
|
|
|
|
- {
|
|
|
|
- final V result = callable.call();
|
|
|
|
- LOGGER.trace( () -> "completed " + name, () -> TimeDuration.fromCurrent( itemStartTime ) );
|
|
|
|
- executor.shutdown();
|
|
|
|
- return result;
|
|
|
|
- }
|
|
|
|
- catch ( final Exception e )
|
|
|
|
- {
|
|
|
|
- LOGGER.error( () -> "error running scheduled immediate task: " + name + ", error: " + e.getMessage(), e );
|
|
|
|
- throw e;
|
|
|
|
- }
|
|
|
|
- };
|
|
|
|
|
|
+ final ExecutorService executor = makeMultiThreadExecutor( 1, pwmApplication, sessionLabel, runnable.getClass() );
|
|
|
|
|
|
- return executor.submit( runnableWrapper );
|
|
|
|
|
|
+ executor.submit( runnable );
|
|
}
|
|
}
|
|
|
|
|
|
public void scheduleDailyZuluZeroStartJob(
|
|
public void scheduleDailyZuluZeroStartJob(
|
|
final Runnable runnable,
|
|
final Runnable runnable,
|
|
- final ExecutorService executorService,
|
|
|
|
- final TimeDuration offset
|
|
|
|
|
|
+ final ScheduledExecutorService executorService,
|
|
|
|
+ final TimeDuration zuluOffset
|
|
)
|
|
)
|
|
{
|
|
{
|
|
final TimeDuration delayTillNextZulu = TimeDuration.fromCurrent( nextZuluZeroTime() );
|
|
final TimeDuration delayTillNextZulu = TimeDuration.fromCurrent( nextZuluZeroTime() );
|
|
- final TimeDuration delayTillNextOffset = delayTillNextZulu.add( offset );
|
|
|
|
|
|
+ final TimeDuration delayTillNextOffset = zuluOffset == null ? TimeDuration.ZERO : delayTillNextZulu.add( zuluOffset );
|
|
scheduleFixedRateJob( runnable, executorService, delayTillNextOffset, TimeDuration.DAY );
|
|
scheduleFixedRateJob( runnable, executorService, delayTillNextOffset, TimeDuration.DAY );
|
|
}
|
|
}
|
|
|
|
|
|
- public Future<?> scheduleJob(
|
|
|
|
|
|
+ public ScheduledFuture<?> scheduleJob(
|
|
final Runnable runnable,
|
|
final Runnable runnable,
|
|
- final ExecutorService executor,
|
|
|
|
|
|
+ final ScheduledExecutorService executor,
|
|
final TimeDuration delay
|
|
final TimeDuration delay
|
|
)
|
|
)
|
|
{
|
|
{
|
|
@@ -134,29 +106,25 @@ public class PwmScheduler
|
|
Objects.requireNonNull( executor );
|
|
Objects.requireNonNull( executor );
|
|
Objects.requireNonNull( delay );
|
|
Objects.requireNonNull( delay );
|
|
|
|
|
|
- if ( applicationExecutorService.isShutdown() )
|
|
|
|
- {
|
|
|
|
- throw new IllegalStateException( "can not schedule job with shutdown scheduler" );
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- final FutureRunner<Object> wrappedRunner = new FutureRunner<>( runnable, executor );
|
|
|
|
- applicationExecutorService.schedule( wrappedRunner, delay.asMillis(), TimeUnit.MILLISECONDS );
|
|
|
|
- return wrappedRunner.getFuture();
|
|
|
|
|
|
+ return executor.schedule( runnable, delay.asMillis(), TimeUnit.MILLISECONDS );
|
|
}
|
|
}
|
|
|
|
|
|
public <T> List<T> executeImmediateThreadPerJobAndAwaitCompletion(
|
|
public <T> List<T> executeImmediateThreadPerJobAndAwaitCompletion(
|
|
- final List<Callable<T>> runnableList,
|
|
|
|
- final String threadNames
|
|
|
|
|
|
+ final int maxThreadCount,
|
|
|
|
+ final List<Callable<T>> callables,
|
|
|
|
+ final SessionLabel sessionLabel,
|
|
|
|
+ final Class<?> theClass
|
|
)
|
|
)
|
|
throws PwmUnrecoverableException
|
|
throws PwmUnrecoverableException
|
|
{
|
|
{
|
|
checkIfSchedulerClosed();
|
|
checkIfSchedulerClosed();
|
|
|
|
|
|
- final List<Future<T>> futures = new ArrayList<>( runnableList.size() );
|
|
|
|
- for ( final Callable<T> callable : runnableList )
|
|
|
|
- {
|
|
|
|
- futures.add( this.immediateExecuteCallableInNewThread( callable, threadNames ) );
|
|
|
|
- }
|
|
|
|
|
|
+ final ExecutorService executor = makeMultiThreadExecutor( maxThreadCount, pwmApplication, sessionLabel, theClass );
|
|
|
|
+
|
|
|
|
+ final List<Future<T>> futures = callables.stream()
|
|
|
|
+ .map( executor::submit )
|
|
|
|
+ .collect( Collectors.toUnmodifiableList() );
|
|
|
|
+
|
|
|
|
|
|
final List<T> results = new ArrayList<>();
|
|
final List<T> results = new ArrayList<>();
|
|
for ( final Future<T> f : futures )
|
|
for ( final Future<T> f : futures )
|
|
@@ -164,6 +132,7 @@ public class PwmScheduler
|
|
results.add( awaitFutureCompletion( f ) );
|
|
results.add( awaitFutureCompletion( f ) );
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ executor.shutdown();
|
|
return Collections.unmodifiableList( results );
|
|
return Collections.unmodifiableList( results );
|
|
}
|
|
}
|
|
|
|
|
|
@@ -195,48 +164,20 @@ public class PwmScheduler
|
|
|
|
|
|
public void scheduleFixedRateJob(
|
|
public void scheduleFixedRateJob(
|
|
final Runnable runnable,
|
|
final Runnable runnable,
|
|
- final ExecutorService executor,
|
|
|
|
|
|
+ final ScheduledExecutorService executor,
|
|
final TimeDuration initialDelay,
|
|
final TimeDuration initialDelay,
|
|
final TimeDuration frequency
|
|
final TimeDuration frequency
|
|
)
|
|
)
|
|
{
|
|
{
|
|
checkIfSchedulerClosed();
|
|
checkIfSchedulerClosed();
|
|
|
|
|
|
-
|
|
|
|
- if ( initialDelay != null )
|
|
|
|
- {
|
|
|
|
- applicationExecutorService.schedule( new FutureRunner( runnable, executor ), initialDelay.asMillis(), TimeUnit.MILLISECONDS );
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- final Runnable jobWithNextScheduler = () ->
|
|
|
|
- {
|
|
|
|
- new FutureRunner( runnable, executor ).run();
|
|
|
|
- scheduleFixedRateJob( runnable, executor, null, frequency );
|
|
|
|
- };
|
|
|
|
-
|
|
|
|
- applicationExecutorService.schedule( jobWithNextScheduler, frequency.asMillis(), TimeUnit.MILLISECONDS );
|
|
|
|
|
|
+ executor.scheduleAtFixedRate( runnable, initialDelay.asMillis(), frequency.asMillis(), TimeUnit.MILLISECONDS );
|
|
}
|
|
}
|
|
|
|
|
|
- public static ExecutorService makeBackgroundExecutor(
|
|
|
|
|
|
+ public static String makeThreadName(
|
|
|
|
+ final SessionLabel sessionLabel,
|
|
final PwmApplication pwmApplication,
|
|
final PwmApplication pwmApplication,
|
|
- final Class<?> clazz
|
|
|
|
- )
|
|
|
|
- {
|
|
|
|
- final ThreadPoolExecutor executor = new ThreadPoolExecutor(
|
|
|
|
- 1,
|
|
|
|
- 1,
|
|
|
|
- 10, TimeUnit.SECONDS,
|
|
|
|
- new LinkedBlockingQueue<>(),
|
|
|
|
- makePwmThreadFactory(
|
|
|
|
- makeThreadName( pwmApplication, clazz ) + "-",
|
|
|
|
- true
|
|
|
|
- ) );
|
|
|
|
- executor.allowCoreThreadTimeOut( true );
|
|
|
|
- return executor;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
-
|
|
|
|
- public static String makeThreadName( final PwmApplication pwmApplication, final Class<?> theClass )
|
|
|
|
|
|
+ final Class<?> theClass )
|
|
{
|
|
{
|
|
String instanceName = "-";
|
|
String instanceName = "-";
|
|
if ( pwmApplication != null )
|
|
if ( pwmApplication != null )
|
|
@@ -244,18 +185,37 @@ public class PwmScheduler
|
|
instanceName = pwmApplication.getInstanceID();
|
|
instanceName = pwmApplication.getInstanceID();
|
|
}
|
|
}
|
|
|
|
|
|
- return makeThreadName( instanceName, theClass );
|
|
|
|
|
|
+ return makeThreadName( sessionLabel, instanceName, theClass );
|
|
}
|
|
}
|
|
|
|
|
|
- public static String makeThreadName( final String instanceID, final Class<?> theClass )
|
|
|
|
|
|
+ public static String makeThreadName(
|
|
|
|
+ final SessionLabel sessionLabel,
|
|
|
|
+ final String instanceID,
|
|
|
|
+ final Class<?> theClass )
|
|
{
|
|
{
|
|
- String instanceName = "-";
|
|
|
|
|
|
+ final StringBuilder output = new StringBuilder();
|
|
|
|
+
|
|
|
|
+ output.append( PwmConstants.PWM_APP_NAME );
|
|
|
|
+
|
|
if ( StringUtil.notEmpty( instanceID ) )
|
|
if ( StringUtil.notEmpty( instanceID ) )
|
|
{
|
|
{
|
|
- instanceName = instanceID;
|
|
|
|
|
|
+ output.append( "-" );
|
|
|
|
+ output.append( instanceID );
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if ( theClass != null )
|
|
|
|
+ {
|
|
|
|
+ output.append( "-" );
|
|
|
|
+ output.append( theClass.getSimpleName() );
|
|
}
|
|
}
|
|
|
|
|
|
- return PwmConstants.PWM_APP_NAME + "-" + instanceName + "-" + theClass.getSimpleName();
|
|
|
|
|
|
+ if ( sessionLabel != null && !StringUtil.isEmpty( sessionLabel.getDomain() ) )
|
|
|
|
+ {
|
|
|
|
+ output.append( "-" );
|
|
|
|
+ output.append( sessionLabel.getDomain() );
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return output.toString();
|
|
}
|
|
}
|
|
|
|
|
|
public static ThreadFactory makePwmThreadFactory( final String namePrefix, final boolean daemon )
|
|
public static ThreadFactory makePwmThreadFactory( final String namePrefix, final boolean daemon )
|
|
@@ -279,108 +239,55 @@ public class PwmScheduler
|
|
};
|
|
};
|
|
}
|
|
}
|
|
|
|
|
|
- public static ScheduledExecutorService makeSingleThreadExecutorService(
|
|
|
|
|
|
+ public static ThreadPoolExecutor makeMultiThreadExecutor(
|
|
|
|
+ final int maxThreadCount,
|
|
final PwmApplication pwmApplication,
|
|
final PwmApplication pwmApplication,
|
|
|
|
+ final SessionLabel sessionLabel,
|
|
final Class<?> theClass
|
|
final Class<?> theClass
|
|
)
|
|
)
|
|
{
|
|
{
|
|
- return makeSingleThreadExecutorService( pwmApplication.getInstanceID(), theClass );
|
|
|
|
|
|
+ return makeMultiThreadExecutor( maxThreadCount, pwmApplication.getInstanceID(), sessionLabel, theClass );
|
|
}
|
|
}
|
|
|
|
|
|
- public static ScheduledExecutorService makeSingleThreadExecutorService(
|
|
|
|
|
|
+ public static ThreadPoolExecutor makeMultiThreadExecutor(
|
|
|
|
+ final int maxThreadCount,
|
|
final String instanceID,
|
|
final String instanceID,
|
|
|
|
+ final SessionLabel sessionLabel,
|
|
final Class<?> theClass
|
|
final Class<?> theClass
|
|
)
|
|
)
|
|
{
|
|
{
|
|
- return Executors.newSingleThreadScheduledExecutor(
|
|
|
|
|
|
+ final ThreadPoolExecutor executor = new ThreadPoolExecutor(
|
|
|
|
+ 1,
|
|
|
|
+ maxThreadCount,
|
|
|
|
+ 1, TimeUnit.SECONDS,
|
|
|
|
+ new LinkedBlockingQueue<>(),
|
|
makePwmThreadFactory(
|
|
makePwmThreadFactory(
|
|
- makeThreadName( instanceID, theClass ) + "-",
|
|
|
|
|
|
+ makeThreadName( sessionLabel, instanceID, theClass ) + "-",
|
|
true
|
|
true
|
|
) );
|
|
) );
|
|
|
|
+ executor.allowCoreThreadTimeOut( true );
|
|
|
|
+ return executor;
|
|
}
|
|
}
|
|
|
|
|
|
- private static class FutureRunner<T> implements Runnable
|
|
|
|
|
|
+ public static ScheduledExecutorService makeBackgroundServiceExecutor(
|
|
|
|
+ final PwmApplication pwmApplication,
|
|
|
|
+ final SessionLabel sessionLabel,
|
|
|
|
+ final Class<?> clazz
|
|
|
|
+ )
|
|
{
|
|
{
|
|
- private final Runnable runnable;
|
|
|
|
- private final ExecutorService executor;
|
|
|
|
- private volatile Future<Object> innerFuture;
|
|
|
|
- private volatile boolean hasFailed;
|
|
|
|
-
|
|
|
|
- enum Flag
|
|
|
|
- {
|
|
|
|
- ShutdownExecutorAfterExecution,
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- FutureRunner( final Runnable runnable, final ExecutorService executor )
|
|
|
|
- {
|
|
|
|
- this.runnable = runnable;
|
|
|
|
- this.executor = executor;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- Future<T> getFuture()
|
|
|
|
- {
|
|
|
|
- return new Future<T>()
|
|
|
|
- {
|
|
|
|
- @Override
|
|
|
|
- public boolean cancel( final boolean mayInterruptIfRunning )
|
|
|
|
- {
|
|
|
|
- return false;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public boolean isCancelled()
|
|
|
|
- {
|
|
|
|
- return hasFailed;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public boolean isDone()
|
|
|
|
- {
|
|
|
|
- return hasFailed || ( innerFuture != null && innerFuture.isDone() );
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public T get()
|
|
|
|
- {
|
|
|
|
- return null;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public T get( final long timeout, @NotNull final TimeUnit unit )
|
|
|
|
- {
|
|
|
|
- return null;
|
|
|
|
- }
|
|
|
|
- };
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public void run()
|
|
|
|
- {
|
|
|
|
- try
|
|
|
|
- {
|
|
|
|
- if ( !executor.isShutdown() )
|
|
|
|
- {
|
|
|
|
- innerFuture = executor.submit( Executors.callable( runnable ) );
|
|
|
|
- }
|
|
|
|
- else
|
|
|
|
- {
|
|
|
|
- hasFailed = true;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- catch ( final Throwable t )
|
|
|
|
- {
|
|
|
|
- LOGGER.error( () -> "unexpected error running scheduled job: " + t.getMessage(), t );
|
|
|
|
- hasFailed = true;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(
|
|
|
|
+ 1,
|
|
|
|
+ makePwmThreadFactory(
|
|
|
|
+ makeThreadName( sessionLabel, pwmApplication, clazz ) + "-",
|
|
|
|
+ true
|
|
|
|
+ ) );
|
|
|
|
+ executor.setKeepAliveTime( 1, TimeUnit.MINUTES );
|
|
|
|
+ executor.allowCoreThreadTimeOut( true );
|
|
|
|
+ return executor;
|
|
}
|
|
}
|
|
|
|
|
|
private void checkIfSchedulerClosed()
|
|
private void checkIfSchedulerClosed()
|
|
{
|
|
{
|
|
- if ( applicationExecutorService.isShutdown() )
|
|
|
|
- {
|
|
|
|
- throw new PwmInternalException( "scheduler is closed" );
|
|
|
|
- }
|
|
|
|
}
|
|
}
|
|
|
|
|
|
public static Instant nextZuluZeroTime( )
|
|
public static Instant nextZuluZeroTime( )
|