|
@@ -26,7 +26,6 @@ import password.pwm.error.PwmUnrecoverableException;
|
|
|
import password.pwm.util.logging.PwmLogger;
|
|
|
|
|
|
import java.io.File;
|
|
|
-import java.io.FileInputStream;
|
|
|
import java.io.FileNotFoundException;
|
|
|
import java.io.IOException;
|
|
|
import java.io.Serializable;
|
|
@@ -35,14 +34,19 @@ import java.nio.ByteBuffer;
|
|
|
import java.nio.channels.FileChannel;
|
|
|
import java.nio.file.Files;
|
|
|
import java.time.Instant;
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.Collections;
|
|
|
import java.util.LinkedHashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Objects;
|
|
|
-import java.util.concurrent.ForkJoinPool;
|
|
|
-import java.util.concurrent.RecursiveTask;
|
|
|
+import java.util.Optional;
|
|
|
+import java.util.Queue;
|
|
|
+import java.util.concurrent.BlockingQueue;
|
|
|
+import java.util.concurrent.ConcurrentLinkedQueue;
|
|
|
+import java.util.concurrent.Executor;
|
|
|
+import java.util.concurrent.ExecutorService;
|
|
|
+import java.util.concurrent.LinkedBlockingQueue;
|
|
|
+import java.util.concurrent.ThreadPoolExecutor;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
import java.util.zip.CRC32;
|
|
@@ -51,90 +55,133 @@ public class FileSystemUtility
|
|
|
{
|
|
|
private static final PwmLogger LOGGER = PwmLogger.forClass( FileSystemUtility.class );
|
|
|
|
|
|
+ private static final int CRC_BUFFER_SIZE = 60 * 1024;
|
|
|
+
|
|
|
private static final AtomicLoopIntIncrementer OP_COUNTER = new AtomicLoopIntIncrementer();
|
|
|
|
|
|
- public static List<FileSummaryInformation> readFileInformation( final File rootFile )
|
|
|
+
|
|
|
+ public static ClosableIterator<FileSummaryInformation> readFileInformation( final List<File> rootFiles )
|
|
|
{
|
|
|
final Instant startTime = Instant.now();
|
|
|
final int operation = OP_COUNTER.next();
|
|
|
- LOGGER.trace( () -> "begin file summary load for file '" + rootFile.getAbsolutePath() + ", operation=" + operation );
|
|
|
- final ForkJoinPool pool = new ForkJoinPool();
|
|
|
- final RecursiveFileReaderTask task = new RecursiveFileReaderTask( rootFile );
|
|
|
- final List<FileSummaryInformation> fileSummaryInformations = pool.invoke( task );
|
|
|
- final AtomicLong byteCount = new AtomicLong( 0 );
|
|
|
- final AtomicInteger fileCount = new AtomicInteger( 0 );
|
|
|
- fileSummaryInformations.forEach( fileSummaryInformation -> byteCount.addAndGet( fileSummaryInformation.getSize() ) );
|
|
|
- fileSummaryInformations.forEach( fileSummaryInformation -> fileCount.incrementAndGet() );
|
|
|
- final Map<String, String> debugInfo = new LinkedHashMap<>();
|
|
|
- debugInfo.put( "operation", Integer.toString( operation ) );
|
|
|
- debugInfo.put( "bytes", StringUtil.formatDiskSizeforDebug( byteCount.get() ) );
|
|
|
- debugInfo.put( "files", Integer.toString( fileCount.get() ) );
|
|
|
- debugInfo.put( "duration", TimeDuration.compactFromCurrent( startTime ) );
|
|
|
- LOGGER.trace( () -> "completed file summary load for file '" + rootFile.getAbsolutePath() + ", " + StringUtil.mapToString( debugInfo ) );
|
|
|
- return fileSummaryInformations;
|
|
|
+
|
|
|
+ final int cpus = Runtime.getRuntime().availableProcessors();
|
|
|
+ final BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>( );
|
|
|
+ final ExecutorService executor = new ThreadPoolExecutor( cpus, cpus, Long.MAX_VALUE, TimeUnit.MILLISECONDS, workQueue );
|
|
|
+ final TaskData taskData = new TaskData( executor );
|
|
|
+
|
|
|
+ for ( final File rootFile : rootFiles )
|
|
|
+ {
|
|
|
+ LOGGER.trace( () -> "begin file summary load for file '" + rootFile.getAbsolutePath() + ", operation=" + operation );
|
|
|
+ executor.execute( new RecursiveFileReaderTask( rootFile, taskData ) );
|
|
|
+ }
|
|
|
+
|
|
|
+ return new ConcurrentClosableIteratorWrapper<>( () ->
|
|
|
+ {
|
|
|
+ while ( taskData.getWorkInProgress().get() > 0 )
|
|
|
+ {
|
|
|
+ final FileSummaryInformation next = taskData.getOutputQueue().poll();
|
|
|
+
|
|
|
+ if ( next == null )
|
|
|
+ {
|
|
|
+ TimeDuration.of( 20, TimeDuration.Unit.MILLISECONDS ).pause();
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ return Optional.of( next );
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return Optional.empty();
|
|
|
+ },
|
|
|
+ () ->
|
|
|
+ {
|
|
|
+ executor.shutdown();
|
|
|
+ final Map<String, String> debugInfo = new LinkedHashMap<>();
|
|
|
+ debugInfo.put( "bytes", StringUtil.formatDiskSizeforDebug( taskData.getByteCount().get() ) );
|
|
|
+ debugInfo.put( "files", Integer.toString( taskData.getFileCount().get() ) );
|
|
|
+ debugInfo.put( "duration", TimeDuration.compactFromCurrent( startTime ) );
|
|
|
+ LOGGER.trace( () -> "completed file summary load for operation '" + operation + ", " + StringUtil.mapToString( debugInfo ) );
|
|
|
+ } );
|
|
|
}
|
|
|
|
|
|
- private static class RecursiveFileReaderTask extends RecursiveTask<List<FileSummaryInformation>>
|
|
|
+ @Value
|
|
|
+ private static class TaskData
|
|
|
+ {
|
|
|
+ private final AtomicLong byteCount = new AtomicLong( 0 );
|
|
|
+ private final AtomicInteger fileCount = new AtomicInteger( 0 );
|
|
|
+ private final AtomicInteger workInProgress = new AtomicInteger( 0 );
|
|
|
+ private final Queue<FileSummaryInformation> outputQueue = new ConcurrentLinkedQueue<>();
|
|
|
+
|
|
|
+ private Executor executor;
|
|
|
+
|
|
|
+ TaskData( final Executor executor )
|
|
|
+ {
|
|
|
+ this.executor = executor;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static class RecursiveFileReaderTask implements Runnable
|
|
|
{
|
|
|
private final File theFile;
|
|
|
+ private final TaskData taskData;
|
|
|
|
|
|
- RecursiveFileReaderTask( final File theFile )
|
|
|
+ RecursiveFileReaderTask( final File theFile, final TaskData taskData )
|
|
|
{
|
|
|
Objects.requireNonNull( theFile );
|
|
|
+ Objects.requireNonNull( taskData );
|
|
|
this.theFile = theFile;
|
|
|
+ this.taskData = taskData;
|
|
|
+ this.taskData.getWorkInProgress().incrementAndGet();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- protected List<FileSummaryInformation> compute()
|
|
|
+ public void run()
|
|
|
{
|
|
|
- final List<FileSummaryInformation> results = new ArrayList<>();
|
|
|
-
|
|
|
- if ( theFile.isDirectory() )
|
|
|
+ try
|
|
|
{
|
|
|
- final File[] subFiles = theFile.listFiles();
|
|
|
- if ( subFiles != null )
|
|
|
+ if ( theFile.isDirectory() )
|
|
|
{
|
|
|
- final List<RecursiveFileReaderTask> tasks = new ArrayList<>();
|
|
|
- for ( final File file : subFiles )
|
|
|
+ final File[] subFiles = theFile.listFiles();
|
|
|
+ if ( subFiles != null )
|
|
|
{
|
|
|
- final RecursiveFileReaderTask newTask = new RecursiveFileReaderTask( file );
|
|
|
- newTask.fork();
|
|
|
- tasks.add( newTask );
|
|
|
+ for ( final File file : subFiles )
|
|
|
+ {
|
|
|
+ final RecursiveFileReaderTask newTask = new RecursiveFileReaderTask( file, taskData );
|
|
|
+ taskData.getExecutor().execute( newTask );
|
|
|
+ }
|
|
|
}
|
|
|
- tasks.forEach( recursiveFileReaderTask -> results.addAll( recursiveFileReaderTask.join() ) );
|
|
|
}
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- try
|
|
|
+ else
|
|
|
{
|
|
|
- results.add( fileInformationForFile( theFile ) );
|
|
|
- }
|
|
|
- catch ( Exception e )
|
|
|
- {
|
|
|
- LOGGER.debug( () -> "error executing file summary reader: " + e.getMessage() );
|
|
|
+ try
|
|
|
+ {
|
|
|
+ if ( theFile.exists() )
|
|
|
+ {
|
|
|
+ final FileSummaryInformation fileSummaryInformation = new FileSummaryInformation(
|
|
|
+ theFile.getName(),
|
|
|
+ theFile.getParentFile().getAbsolutePath(),
|
|
|
+ Instant.ofEpochMilli( theFile.lastModified() ),
|
|
|
+ theFile.length(),
|
|
|
+ crc32( theFile )
|
|
|
+ );
|
|
|
+ taskData.getByteCount().addAndGet( fileSummaryInformation.getSize() );
|
|
|
+ taskData.getFileCount().incrementAndGet();
|
|
|
+ taskData.getOutputQueue().offer( fileSummaryInformation );
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch ( Exception e )
|
|
|
+ {
|
|
|
+ LOGGER.debug( () -> "error executing file summary reader: " + e.getMessage() );
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- return Collections.unmodifiableList( results );
|
|
|
+ finally
|
|
|
+ {
|
|
|
+ this.taskData.getWorkInProgress().decrementAndGet();
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private static FileSummaryInformation fileInformationForFile( final File file )
|
|
|
- throws IOException
|
|
|
- {
|
|
|
- if ( file == null || !file.exists() )
|
|
|
- {
|
|
|
- return null;
|
|
|
- }
|
|
|
- return new FileSummaryInformation(
|
|
|
- file.getName(),
|
|
|
- file.getParentFile().getAbsolutePath(),
|
|
|
- Instant.ofEpochMilli( file.lastModified() ),
|
|
|
- file.length(),
|
|
|
- crc32( file )
|
|
|
- );
|
|
|
- }
|
|
|
|
|
|
public static long getFileDirectorySize( final File dir )
|
|
|
{
|
|
@@ -260,9 +307,9 @@ public class FileSystemUtility
|
|
|
throws IOException
|
|
|
{
|
|
|
final CRC32 crc32 = new CRC32();
|
|
|
- final FileInputStream fileInputStream = new FileInputStream( file );
|
|
|
- final FileChannel fileChannel = fileInputStream.getChannel();
|
|
|
- final ByteBuffer byteBuffer = ByteBuffer.allocateDirect( 1024 );
|
|
|
+ final FileChannel fileChannel = FileChannel.open( file.toPath() );
|
|
|
+ final int bufferSize = (int) Math.min( file.length(), CRC_BUFFER_SIZE );
|
|
|
+ final ByteBuffer byteBuffer = ByteBuffer.allocateDirect( bufferSize );
|
|
|
|
|
|
while ( fileChannel.read( byteBuffer ) > 0 )
|
|
|
{
|