|
@@ -20,35 +20,28 @@
|
|
|
|
|
|
package password.pwm.svc.report;
|
|
|
|
|
|
-import org.apache.commons.csv.CSVPrinter;
|
|
|
+import lombok.Value;
|
|
|
import org.jetbrains.annotations.NotNull;
|
|
|
import password.pwm.AppAttribute;
|
|
|
-import password.pwm.PwmApplication;
|
|
|
-import password.pwm.PwmConstants;
|
|
|
import password.pwm.PwmDomain;
|
|
|
import password.pwm.bean.SessionLabel;
|
|
|
import password.pwm.bean.UserIdentity;
|
|
|
-import password.pwm.config.AppConfig;
|
|
|
import password.pwm.config.value.data.UserPermission;
|
|
|
import password.pwm.error.ErrorInformation;
|
|
|
import password.pwm.error.PwmError;
|
|
|
import password.pwm.error.PwmException;
|
|
|
-import password.pwm.error.PwmInternalException;
|
|
|
import password.pwm.error.PwmOperationalException;
|
|
|
import password.pwm.error.PwmUnrecoverableException;
|
|
|
import password.pwm.http.bean.DisplayElement;
|
|
|
-import password.pwm.ldap.UserInfoFactory;
|
|
|
import password.pwm.ldap.permission.UserPermissionUtility;
|
|
|
-import password.pwm.user.UserInfo;
|
|
|
import password.pwm.util.EventRateMeter;
|
|
|
+import password.pwm.util.PwmScheduler;
|
|
|
import password.pwm.util.java.ConditionalTaskExecutor;
|
|
|
import password.pwm.util.java.FunctionalReentrantLock;
|
|
|
import password.pwm.util.java.JavaHelper;
|
|
|
import password.pwm.util.java.PwmTimeUtil;
|
|
|
-import password.pwm.util.java.PwmUtil;
|
|
|
import password.pwm.util.java.TimeDuration;
|
|
|
import password.pwm.util.json.JsonFactory;
|
|
|
-import password.pwm.util.json.JsonProvider;
|
|
|
import password.pwm.util.logging.PwmLogLevel;
|
|
|
import password.pwm.util.logging.PwmLogger;
|
|
|
|
|
@@ -59,18 +52,17 @@ import java.math.RoundingMode;
|
|
|
import java.time.Instant;
|
|
|
import java.util.ArrayDeque;
|
|
|
import java.util.ArrayList;
|
|
|
-import java.util.Collection;
|
|
|
-import java.util.Collections;
|
|
|
-import java.util.Iterator;
|
|
|
import java.util.List;
|
|
|
import java.util.Locale;
|
|
|
import java.util.Objects;
|
|
|
+import java.util.Optional;
|
|
|
import java.util.Queue;
|
|
|
-import java.util.concurrent.Callable;
|
|
|
+import java.util.concurrent.CompletionService;
|
|
|
import java.util.concurrent.ExecutionException;
|
|
|
+import java.util.concurrent.ExecutorCompletionService;
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
import java.util.concurrent.Future;
|
|
|
-import java.util.concurrent.Semaphore;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
import java.util.function.Supplier;
|
|
@@ -83,84 +75,93 @@ public class ReportProcess implements AutoCloseable
|
|
|
|
|
|
private static final FunctionalReentrantLock REPORT_ID_LOCK = new FunctionalReentrantLock();
|
|
|
|
|
|
- private final PwmApplication pwmApplication;
|
|
|
- private final Semaphore reportServiceSemaphore;
|
|
|
+ private final PwmDomain pwmDomain;
|
|
|
+ private final ReportService reportService;
|
|
|
+
|
|
|
private final ReportSettings reportSettings;
|
|
|
- private final Locale locale;
|
|
|
- private final SessionLabel sessionLabel;
|
|
|
|
|
|
private final ConditionalTaskExecutor debugOutputLogger;
|
|
|
private final long reportId;
|
|
|
+ private final ReportProcessRequest reportProcessRequest;
|
|
|
+
|
|
|
private final AtomicLong recordCounter = new AtomicLong();
|
|
|
private final AtomicBoolean inProgress = new AtomicBoolean();
|
|
|
private final AtomicBoolean cancelFlag = new AtomicBoolean();
|
|
|
private final EventRateMeter processRateMeter = new EventRateMeter( TimeDuration.MINUTE.asDuration() );
|
|
|
-
|
|
|
+ private final List<String> recordErrorMessages = new ArrayList<>();
|
|
|
|
|
|
private Instant startTime = Instant.now();
|
|
|
- private ReportSummaryCalculator summaryData = ReportSummaryCalculator.newSummaryData( Collections.singletonList( 1 ) );
|
|
|
+ private ReportSummaryCalculator summaryCalculator = ReportSummaryCalculator.empty();
|
|
|
+ private ReportProcessResult result;
|
|
|
|
|
|
ReportProcess(
|
|
|
- @NotNull final PwmApplication pwmApplication,
|
|
|
- @NotNull final Semaphore reportServiceSemaphore,
|
|
|
- @NotNull final ReportSettings reportSettings,
|
|
|
- final Locale locale,
|
|
|
- @NotNull final SessionLabel sessionLabel
|
|
|
+ final PwmDomain pwmDomain,
|
|
|
+ final ReportService reportService,
|
|
|
+ final ReportProcessRequest reportProcessRequest,
|
|
|
+ final ReportSettings reportSettings
|
|
|
)
|
|
|
{
|
|
|
- this.pwmApplication = Objects.requireNonNull( pwmApplication );
|
|
|
- this.reportServiceSemaphore = Objects.requireNonNull( reportServiceSemaphore );
|
|
|
+ this.pwmDomain = Objects.requireNonNull( pwmDomain );
|
|
|
+ this.reportService = Objects.requireNonNull( reportService );
|
|
|
this.reportSettings = Objects.requireNonNull( reportSettings );
|
|
|
- this.locale = Objects.requireNonNullElse( locale, PwmConstants.DEFAULT_LOCALE );
|
|
|
- this.sessionLabel = sessionLabel;
|
|
|
+ this.reportProcessRequest = reportProcessRequest;
|
|
|
|
|
|
this.reportId = nextProcessId();
|
|
|
|
|
|
this.debugOutputLogger = ConditionalTaskExecutor.forPeriodicTask(
|
|
|
- () -> log( PwmLogLevel.TRACE, () -> " in progress: " + recordCounter.longValue() + " records exported at " + processRateMeter.prettyEps( locale ),
|
|
|
- TimeDuration.fromCurrent( startTime ) ),
|
|
|
+ this::logStatus,
|
|
|
TimeDuration.MINUTE.asDuration() );
|
|
|
}
|
|
|
|
|
|
- private void log( final PwmLogLevel level, final Supplier<String> message, final TimeDuration timeDuration )
|
|
|
+ void log( final PwmLogLevel level, final Supplier<String> message, final TimeDuration timeDuration )
|
|
|
{
|
|
|
final Supplier<String> wrappedMsg = () -> "report #" + reportId + " " + message.get();
|
|
|
- LOGGER.log( level, sessionLabel, wrappedMsg, null, timeDuration );
|
|
|
+ LOGGER.log( level, reportProcessRequest.getSessionLabel(), wrappedMsg, null, timeDuration );
|
|
|
}
|
|
|
|
|
|
- static ReportProcess createReportProcess(
|
|
|
- @NotNull final PwmApplication pwmApplication,
|
|
|
- @NotNull final Semaphore reportServiceSemaphore,
|
|
|
- @NotNull final ReportSettings reportSettings,
|
|
|
- final Locale locale,
|
|
|
- @NotNull final SessionLabel sessionLabel
|
|
|
- )
|
|
|
+ private void logStatus()
|
|
|
{
|
|
|
- return new ReportProcess( pwmApplication, reportServiceSemaphore, reportSettings, locale, sessionLabel );
|
|
|
+ log( PwmLogLevel.TRACE,
|
|
|
+ () -> "in progress: " + recordCounter.longValue()
|
|
|
+ + " records exported at " + processRateMeter.prettyEps( reportProcessRequest.getLocale() )
|
|
|
+ + " records/second, duration: "
|
|
|
+ + TimeDuration.fromCurrent( startTime ).asCompactString()
|
|
|
+ + " jobDetails: "
|
|
|
+ + JsonFactory.get().serialize( reportProcessRequest ),
|
|
|
+ null );
|
|
|
}
|
|
|
|
|
|
- public static void outputSummaryToCsv(
|
|
|
- final AppConfig config,
|
|
|
- final ReportSummaryCalculator reportSummaryData,
|
|
|
- final OutputStream outputStream,
|
|
|
- final Locale locale
|
|
|
- )
|
|
|
- throws IOException
|
|
|
+ boolean isCancelled()
|
|
|
{
|
|
|
+ return cancelFlag.get();
|
|
|
+ }
|
|
|
|
|
|
- final List<ReportSummaryCalculator.PresentationRow> outputList = reportSummaryData.asPresentableCollection( config, locale );
|
|
|
- final CSVPrinter csvPrinter = PwmUtil.makeCsvPrinter( outputStream );
|
|
|
+ PwmDomain getPwmDomain()
|
|
|
+ {
|
|
|
+ return pwmDomain;
|
|
|
+ }
|
|
|
|
|
|
- for ( final ReportSummaryCalculator.PresentationRow presentationRow : outputList )
|
|
|
- {
|
|
|
- csvPrinter.printRecord( presentationRow.toStringList() );
|
|
|
- }
|
|
|
+ SessionLabel getSessionLabel()
|
|
|
+ {
|
|
|
+ return reportProcessRequest.getSessionLabel();
|
|
|
+ }
|
|
|
+
|
|
|
+ Optional<ReportProcessResult> getResult()
|
|
|
+ {
|
|
|
+ return Optional.ofNullable( result );
|
|
|
+ }
|
|
|
|
|
|
- csvPrinter.flush();
|
|
|
+ static ReportProcess createReportProcess(
|
|
|
+ final PwmDomain pwmDomain,
|
|
|
+ final ReportService reportService,
|
|
|
+ final ReportProcessRequest reportProcessRequest,
|
|
|
+ final ReportSettings reportSettings
|
|
|
+ )
|
|
|
+ {
|
|
|
+ return new ReportProcess( pwmDomain, reportService, reportProcessRequest, reportSettings );
|
|
|
}
|
|
|
|
|
|
public void startReport(
|
|
|
- @NotNull final ReportProcessRequest reportProcessRequest,
|
|
|
@NotNull final OutputStream outputStream
|
|
|
)
|
|
|
throws PwmUnrecoverableException
|
|
@@ -173,13 +174,13 @@ public class ReportProcess implements AutoCloseable
|
|
|
throw new PwmUnrecoverableException( PwmError.ERROR_INTERNAL, "report process #" + reportId + " cannot be started, report already in progress" );
|
|
|
}
|
|
|
|
|
|
- if ( !reportServiceSemaphore.tryAcquire() )
|
|
|
+ if ( cancelFlag.get() )
|
|
|
{
|
|
|
- throw new PwmUnrecoverableException( PwmError.ERROR_INTERNAL, "report process #" + reportId + " cannot be started, maximum concurrent reports already in progress." );
|
|
|
+ throw new PwmUnrecoverableException( PwmError.ERROR_INTERNAL, "report process #" + reportId + " cannot be started, report already cancelled" );
|
|
|
}
|
|
|
|
|
|
this.startTime = Instant.now();
|
|
|
- this.summaryData = ReportSummaryCalculator.newSummaryData( reportSettings.getTrackDays() );
|
|
|
+ this.summaryCalculator = ReportSummaryCalculator.newSummaryData( reportSettings.getTrackDays() );
|
|
|
this.recordCounter.set( 0 );
|
|
|
this.processRateMeter.reset();
|
|
|
this.inProgress.set( true );
|
|
@@ -195,13 +196,15 @@ public class ReportProcess implements AutoCloseable
|
|
|
catch ( final PwmException e )
|
|
|
{
|
|
|
log( PwmLogLevel.DEBUG, () -> "error during report generation: " + e.getMessage(), null );
|
|
|
- cancelFlag.set( true );
|
|
|
throw new PwmUnrecoverableException( new ErrorInformation( PwmError.ERROR_INTERNAL, e.getMessage() ) );
|
|
|
}
|
|
|
catch ( final IOException e )
|
|
|
{
|
|
|
log( PwmLogLevel.DEBUG, () -> "I/O error during report generation: " + e.getMessage(), null );
|
|
|
- cancelFlag.set( true );
|
|
|
+ }
|
|
|
+ catch ( final Exception e )
|
|
|
+ {
|
|
|
+ log( PwmLogLevel.DEBUG, () -> "Job interrupted during report generation: " + e.getMessage(), null );
|
|
|
}
|
|
|
finally
|
|
|
{
|
|
@@ -213,22 +216,25 @@ public class ReportProcess implements AutoCloseable
|
|
|
final ReportProcessRequest reportProcessRequest,
|
|
|
final ZipOutputStream zipOutputStream
|
|
|
)
|
|
|
- throws PwmUnrecoverableException, PwmOperationalException, IOException
|
|
|
+ throws PwmUnrecoverableException, PwmOperationalException, IOException, ExecutionException, InterruptedException
|
|
|
{
|
|
|
final ReportRecordWriter recordWriter = reportProcessRequest.getReportType() == ReportProcessRequest.ReportType.json
|
|
|
? new ReportJsonRecordWriter( zipOutputStream )
|
|
|
- : new ReportCsvRecordWriter( zipOutputStream, pwmApplication, locale );
|
|
|
+ : new ReportCsvRecordWriter( zipOutputStream, pwmDomain, reportProcessRequest.getLocale() );
|
|
|
|
|
|
- final boolean recordLimitReached = processReport( reportProcessRequest, zipOutputStream, recordWriter );
|
|
|
+ result = executeDomainReport( reportProcessRequest, zipOutputStream, recordWriter );
|
|
|
+
|
|
|
+ checkCancel( zipOutputStream );
|
|
|
+ ReportProcessUtil.outputSummary( pwmDomain, summaryCalculator, reportProcessRequest.getLocale(), zipOutputStream );
|
|
|
|
|
|
checkCancel( zipOutputStream );
|
|
|
- outputSummary( zipOutputStream );
|
|
|
+ ReportProcessUtil.outputResult( zipOutputStream, result );
|
|
|
|
|
|
checkCancel( zipOutputStream );
|
|
|
- outputResult( reportProcessRequest, zipOutputStream, recordLimitReached );
|
|
|
+ ReportProcessUtil.outputErrors( zipOutputStream, recordErrorMessages );
|
|
|
|
|
|
log( PwmLogLevel.TRACE, () -> "completed report generation with " + recordCounter.longValue() + " records at "
|
|
|
- + processRateMeter.prettyEps( locale ), TimeDuration.fromCurrent( startTime ) );
|
|
|
+ + processRateMeter.prettyEps( reportProcessRequest.getLocale() ), TimeDuration.fromCurrent( startTime ) );
|
|
|
|
|
|
}
|
|
|
|
|
@@ -241,183 +247,136 @@ public class ReportProcess implements AutoCloseable
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void outputResult(
|
|
|
- final ReportProcessRequest request,
|
|
|
- final ZipOutputStream zipOutputStream,
|
|
|
- final boolean recordLimitReached
|
|
|
- )
|
|
|
- throws IOException
|
|
|
- {
|
|
|
- final ReportProcessResult result = new ReportProcessResult(
|
|
|
- request,
|
|
|
- this.recordCounter.get(),
|
|
|
- startTime,
|
|
|
- Instant.now(),
|
|
|
- TimeDuration.fromCurrent( startTime ),
|
|
|
- recordLimitReached );
|
|
|
-
|
|
|
- final String jsonData = JsonFactory.get().serialize( result, ReportProcessResult.class, JsonProvider.Flag.PrettyPrint );
|
|
|
-
|
|
|
- zipOutputStream.putNextEntry( new ZipEntry( "result.json" ) );
|
|
|
- zipOutputStream.write( jsonData.getBytes( PwmConstants.DEFAULT_CHARSET ) );
|
|
|
- zipOutputStream.closeEntry();
|
|
|
- }
|
|
|
-
|
|
|
- private void outputSummary(
|
|
|
- final ZipOutputStream zipOutputStream
|
|
|
- )
|
|
|
- throws IOException
|
|
|
- {
|
|
|
- {
|
|
|
- zipOutputStream.putNextEntry( new ZipEntry( "summary.json" ) );
|
|
|
- outputJsonSummaryToZip( summaryData, zipOutputStream );
|
|
|
- zipOutputStream.closeEntry();
|
|
|
- }
|
|
|
- {
|
|
|
- zipOutputStream.putNextEntry( new ZipEntry( "summary.csv" ) );
|
|
|
- outputSummaryToCsv( pwmApplication.getConfig(), summaryData, zipOutputStream, locale );
|
|
|
- zipOutputStream.closeEntry();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private boolean processReport(
|
|
|
+ private ReportProcessResult executeDomainReport(
|
|
|
final ReportProcessRequest reportProcessRequest,
|
|
|
final ZipOutputStream zipOutputStream,
|
|
|
final ReportRecordWriter recordWriter
|
|
|
)
|
|
|
- throws IOException, PwmUnrecoverableException, PwmOperationalException
|
|
|
+ throws IOException, PwmUnrecoverableException, PwmOperationalException, InterruptedException
|
|
|
{
|
|
|
- zipOutputStream.putNextEntry( new ZipEntry( recordWriter.getZipName() ) );
|
|
|
+ final Instant startTime = Instant.now();
|
|
|
|
|
|
+ zipOutputStream.putNextEntry( new ZipEntry( recordWriter.getZipName() ) );
|
|
|
recordWriter.outputHeader();
|
|
|
|
|
|
- int processCounter = 0;
|
|
|
+ final long jobTimeoutMs = reportSettings.getReportJobTimeout().asMillis();
|
|
|
+
|
|
|
+ int recordCounter = 0;
|
|
|
+ int errorCounter = 0;
|
|
|
boolean recordLimitReached = false;
|
|
|
|
|
|
- for (
|
|
|
- final Iterator<PwmDomain> domainIterator = applicableDomains( reportProcessRequest ).iterator();
|
|
|
- domainIterator.hasNext() && !cancelFlag.get() && !recordLimitReached;
|
|
|
- )
|
|
|
- {
|
|
|
- final PwmDomain pwmDomain = domainIterator.next();
|
|
|
+ final CompletionWrapper<UserReportRecord> completion = createAndSubmitRecordReaderTasks( reportProcessRequest );
|
|
|
|
|
|
- for (
|
|
|
- final Iterator<UserReportRecord> reportRecordQueue = executeUserRecordReadJobs( reportProcessRequest, pwmDomain );
|
|
|
- reportRecordQueue.hasNext() && !cancelFlag.get() && !recordLimitReached;
|
|
|
- )
|
|
|
+ try
|
|
|
+ {
|
|
|
+ while ( recordCounter < completion.getItemCount() && !cancelFlag.get() && !recordLimitReached )
|
|
|
{
|
|
|
- processCounter++;
|
|
|
+ final Future<UserReportRecord> future = completion.getCompletionService().poll( jobTimeoutMs, TimeUnit.MILLISECONDS );
|
|
|
+ recordCounter++;
|
|
|
|
|
|
- if ( processCounter >= reportProcessRequest.getMaximumRecords() )
|
|
|
+ try
|
|
|
{
|
|
|
- recordLimitReached = true;
|
|
|
+ final UserReportRecord nextRecord = future.get();
|
|
|
+ recordWriter.outputRecord( nextRecord );
|
|
|
+ perRecordOutputTasks( nextRecord, zipOutputStream );
|
|
|
+ }
|
|
|
+ catch ( final Exception e )
|
|
|
+ {
|
|
|
+ errorCounter++;
|
|
|
+
|
|
|
+ final String msg = JavaHelper.readHostileExceptionMessage( e.getCause() );
|
|
|
+
|
|
|
+ log( PwmLogLevel.TRACE, () -> msg, null );
|
|
|
+
|
|
|
+ if ( recordErrorMessages.size() < 1000 )
|
|
|
+ {
|
|
|
+ recordErrorMessages.add( msg );
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- final UserReportRecord userReportRecord = reportRecordQueue.next();
|
|
|
- final boolean lastRecord = recordLimitReached || ( !reportRecordQueue.hasNext() && !domainIterator.hasNext() );
|
|
|
- recordWriter.outputRecord( userReportRecord, lastRecord );
|
|
|
- perRecordOutputTasks( userReportRecord, zipOutputStream );
|
|
|
+ if ( recordCounter >= reportProcessRequest.getMaximumRecords() )
|
|
|
+ {
|
|
|
+ recordLimitReached = true;
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- recordWriter.outputFooter();
|
|
|
- recordWriter.close();
|
|
|
+ recordWriter.outputFooter();
|
|
|
+ recordWriter.close();
|
|
|
+ }
|
|
|
+ finally
|
|
|
+ {
|
|
|
+ completion.getExecutorService().shutdown();
|
|
|
+ }
|
|
|
|
|
|
- zipOutputStream.closeEntry();
|
|
|
- return recordLimitReached;
|
|
|
+ final Instant finishTime = Instant.now();
|
|
|
+ final TimeDuration duration = TimeDuration.between( startTime, finishTime );
|
|
|
+ return new ReportProcessResult(
|
|
|
+ reportProcessRequest,
|
|
|
+ recordCounter,
|
|
|
+ errorCounter,
|
|
|
+ startTime,
|
|
|
+ finishTime,
|
|
|
+ duration,
|
|
|
+ recordLimitReached,
|
|
|
+ pwmDomain.getPwmApplication().getInstanceID(),
|
|
|
+ Long.toString( reportId ) );
|
|
|
}
|
|
|
|
|
|
- private Iterator<UserReportRecord> executeUserRecordReadJobs(
|
|
|
- final ReportProcessRequest reportProcessRequest,
|
|
|
- final PwmDomain pwmDomain
|
|
|
+ private CompletionWrapper<UserReportRecord> createAndSubmitRecordReaderTasks(
|
|
|
+ final ReportProcessRequest reportProcessRequest
|
|
|
)
|
|
|
throws PwmUnrecoverableException, PwmOperationalException
|
|
|
{
|
|
|
- final Queue<UserIdentity> identityIterator = readUserListFromLdap( reportProcessRequest, pwmDomain );
|
|
|
- final ExecutorService executor = pwmApplication.getReportService().getExecutor();
|
|
|
- final Queue<Future<UserReportRecord>> returnQueue = new ArrayDeque<>();
|
|
|
- while ( !identityIterator.isEmpty() )
|
|
|
- {
|
|
|
- returnQueue.add( executor.submit( new UserReportRecordReaderTask( identityIterator.poll() ) ) );
|
|
|
- }
|
|
|
- return new FutureUserReportRecordIterator( returnQueue );
|
|
|
- }
|
|
|
+ final Queue<UserIdentity> identityIterator = readUserListFromLdap( reportProcessRequest );
|
|
|
|
|
|
- private Collection<PwmDomain> applicableDomains( final ReportProcessRequest reportProcessRequest )
|
|
|
- {
|
|
|
- if ( reportProcessRequest.getDomainID() != null )
|
|
|
+ final ExecutorService executorService = PwmScheduler.makeMultiThreadExecutor(
|
|
|
+ reportSettings.getReportJobThreads(),
|
|
|
+ pwmDomain.getPwmApplication(),
|
|
|
+ getSessionLabel(),
|
|
|
+ ReportProcess.class,
|
|
|
+ "reportId-" + reportId );
|
|
|
+
|
|
|
+ final CompletionService<UserReportRecord> completionService = new ExecutorCompletionService<>( executorService );
|
|
|
+
|
|
|
+ int itemCount = 0;
|
|
|
+ while ( !identityIterator.isEmpty() )
|
|
|
{
|
|
|
- return Collections.singleton( pwmApplication.domains().get( reportProcessRequest.getDomainID() ) );
|
|
|
+ final UserIdentity nextIdentity = identityIterator.poll();
|
|
|
+ final UserReportRecordReaderTask task = new UserReportRecordReaderTask( this, nextIdentity );
|
|
|
+ completionService.submit( task );
|
|
|
+ itemCount++;
|
|
|
}
|
|
|
|
|
|
- return pwmApplication.domains().values();
|
|
|
+ return new CompletionWrapper<>( completionService, executorService, itemCount );
|
|
|
}
|
|
|
|
|
|
private void perRecordOutputTasks( final UserReportRecord userReportRecord, final ZipOutputStream zipOutputStream )
|
|
|
throws IOException
|
|
|
{
|
|
|
checkCancel( zipOutputStream );
|
|
|
- summaryData.update( userReportRecord );
|
|
|
+ summaryCalculator.update( userReportRecord );
|
|
|
recordCounter.incrementAndGet();
|
|
|
processRateMeter.markEvents( 1 );
|
|
|
debugOutputLogger.conditionallyExecuteTask();
|
|
|
-
|
|
|
- log( PwmLogLevel.TRACE, () -> "completed output of user " + UserIdentity.create(
|
|
|
- userReportRecord.getUserDN(),
|
|
|
- userReportRecord.getLdapProfile(),
|
|
|
- userReportRecord.getDomainID() ).toDisplayString(),
|
|
|
- TimeDuration.fromCurrent( startTime ) );
|
|
|
- }
|
|
|
-
|
|
|
- private static void outputJsonSummaryToZip( final ReportSummaryCalculator reportSummary, final OutputStream outputStream )
|
|
|
- {
|
|
|
- try
|
|
|
- {
|
|
|
- final ReportSummaryData data = ReportSummaryData.fromCalculator( reportSummary );
|
|
|
- final String json = JsonFactory.get().serialize( data, ReportSummaryData.class, JsonProvider.Flag.PrettyPrint );
|
|
|
- outputStream.write( json.getBytes( PwmConstants.DEFAULT_CHARSET ) );
|
|
|
- }
|
|
|
- catch ( final IOException e )
|
|
|
- {
|
|
|
- throw new PwmInternalException( e.getMessage(), e );
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private UserReportRecord readUserReportRecord(
|
|
|
- final UserIdentity userIdentity
|
|
|
- )
|
|
|
- {
|
|
|
- try
|
|
|
- {
|
|
|
- final UserInfo userInfo = UserInfoFactory.newUserInfoUsingProxyForOfflineUser(
|
|
|
- pwmApplication,
|
|
|
- sessionLabel,
|
|
|
- userIdentity );
|
|
|
-
|
|
|
- return UserReportRecord.fromUserInfo( userInfo );
|
|
|
- }
|
|
|
- catch ( final PwmUnrecoverableException e )
|
|
|
- {
|
|
|
- throw PwmInternalException.fromPwmException( e );
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
Queue<UserIdentity> readUserListFromLdap(
|
|
|
- final ReportProcessRequest reportProcessRequest,
|
|
|
- final PwmDomain pwmDomain
|
|
|
+ final ReportProcessRequest reportProcessRequest
|
|
|
)
|
|
|
throws PwmUnrecoverableException, PwmOperationalException
|
|
|
{
|
|
|
final Instant loopStartTime = Instant.now();
|
|
|
final int maxSearchSize = ( int ) JavaHelper.rangeCheck( 0, reportSettings.getMaxSearchSize(), reportProcessRequest.getMaximumRecords() );
|
|
|
- log( PwmLogLevel.TRACE, () -> "beginning ldap search process for domain '" + pwmDomain.getDomainID() + "'", null );
|
|
|
|
|
|
+
|
|
|
+ final PwmDomain pwmDomain = getPwmDomain();
|
|
|
+ log( PwmLogLevel.TRACE, () -> "beginning ldap search process for domain '" + pwmDomain.getDomainID() + "'", null );
|
|
|
final List<UserPermission> searchFilters = reportSettings.getSearchFilter().get( pwmDomain.getDomainID() );
|
|
|
|
|
|
final List<UserIdentity> searchResults = UserPermissionUtility.discoverMatchingUsers(
|
|
|
pwmDomain,
|
|
|
searchFilters,
|
|
|
- sessionLabel,
|
|
|
+ reportProcessRequest.getSessionLabel(),
|
|
|
maxSearchSize,
|
|
|
reportSettings.getSearchTimeout() );
|
|
|
|
|
@@ -469,73 +428,26 @@ public class ReportProcess implements AutoCloseable
|
|
|
log( PwmLogLevel.TRACE, () -> "cancelling report process", null );
|
|
|
cancelFlag.set( true );
|
|
|
}
|
|
|
- reportServiceSemaphore.release();
|
|
|
- }
|
|
|
-
|
|
|
- public class UserReportRecordReaderTask implements Callable<UserReportRecord>
|
|
|
- {
|
|
|
- private final UserIdentity userIdentity;
|
|
|
-
|
|
|
- public UserReportRecordReaderTask( final UserIdentity userIdentity )
|
|
|
- {
|
|
|
- this.userIdentity = userIdentity;
|
|
|
- }
|
|
|
|
|
|
- @Override
|
|
|
- public UserReportRecord call()
|
|
|
- {
|
|
|
- if ( cancelFlag.get() )
|
|
|
- {
|
|
|
- throw new RuntimeException( "report process job cancelled" );
|
|
|
- }
|
|
|
- return readUserReportRecord( userIdentity );
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- class FutureUserReportRecordIterator implements Iterator<UserReportRecord>
|
|
|
- {
|
|
|
- private final Queue<Future<UserReportRecord>> reportRecordQueue;
|
|
|
-
|
|
|
- FutureUserReportRecordIterator( final Queue<Future<UserReportRecord>> reportRecordQueue )
|
|
|
- {
|
|
|
- this.reportRecordQueue = reportRecordQueue;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public boolean hasNext()
|
|
|
- {
|
|
|
- return !reportRecordQueue.isEmpty();
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public UserReportRecord next()
|
|
|
- {
|
|
|
- try
|
|
|
- {
|
|
|
- final Future<UserReportRecord> future = reportRecordQueue.poll();
|
|
|
- if ( future == null )
|
|
|
- {
|
|
|
- throw new NoSuchMethodException();
|
|
|
- }
|
|
|
- return future.get();
|
|
|
- }
|
|
|
- catch ( final InterruptedException | ExecutionException | NoSuchMethodException e )
|
|
|
- {
|
|
|
- log( PwmLogLevel.TRACE, () -> "user report record job failure: " + e.getMessage(), null );
|
|
|
- throw new RuntimeException( e );
|
|
|
- }
|
|
|
- }
|
|
|
+ reportService.closeReportProcess( this );
|
|
|
}
|
|
|
|
|
|
private long nextProcessId()
|
|
|
{
|
|
|
return REPORT_ID_LOCK.exec( () ->
|
|
|
{
|
|
|
- final long lastId = pwmApplication.readAppAttribute( AppAttribute.REPORT_COUNTER, Long.class ).orElse( 0L );
|
|
|
+ final long lastId = pwmDomain.getPwmApplication().readAppAttribute( AppAttribute.REPORT_COUNTER, Long.class ).orElse( 0L );
|
|
|
final long nextId = JavaHelper.nextPositiveLong( lastId );
|
|
|
- pwmApplication.writeAppAttribute( AppAttribute.REPORT_COUNTER, nextId );
|
|
|
+ pwmDomain.getPwmApplication().writeAppAttribute( AppAttribute.REPORT_COUNTER, nextId );
|
|
|
return nextId;
|
|
|
} );
|
|
|
}
|
|
|
|
|
|
+ @Value
|
|
|
+ private static class CompletionWrapper<E>
|
|
|
+ {
|
|
|
+ private final CompletionService<E> completionService;
|
|
|
+ private final ExecutorService executorService;
|
|
|
+ private final int itemCount;
|
|
|
+ }
|
|
|
}
|