|
@@ -27,21 +27,28 @@ import password.pwm.PwmApplication;
|
|
|
import password.pwm.error.ErrorInformation;
|
|
|
import password.pwm.error.PwmError;
|
|
|
import password.pwm.error.PwmUnrecoverableException;
|
|
|
+import password.pwm.svc.stats.EventRateMeter;
|
|
|
import password.pwm.util.TransactionSizeCalculator;
|
|
|
+import password.pwm.util.java.ConditionalTaskExecutor;
|
|
|
import password.pwm.util.java.JavaHelper;
|
|
|
+import password.pwm.util.java.Percent;
|
|
|
+import password.pwm.util.java.StringUtil;
|
|
|
import password.pwm.util.java.TimeDuration;
|
|
|
import password.pwm.util.localdb.LocalDB;
|
|
|
import password.pwm.util.localdb.LocalDBException;
|
|
|
import password.pwm.util.logging.PwmLogger;
|
|
|
-import password.pwm.util.secure.ChecksumInputStream;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.io.InputStream;
|
|
|
import java.text.DecimalFormat;
|
|
|
import java.text.NumberFormat;
|
|
|
import java.time.Instant;
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.LinkedHashMap;
|
|
|
import java.util.Map;
|
|
|
import java.util.TreeMap;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
+import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
|
/**
|
|
|
* @author Jason D. Rivard
|
|
@@ -53,21 +60,19 @@ class Populator
|
|
|
// words truncated to this length, prevents massive words if the input
|
|
|
private static final int MAX_LINE_LENGTH = 64;
|
|
|
|
|
|
- private static final TimeDuration DEBUG_OUTPUT_FREQUENCY = TimeDuration.of( 3, TimeDuration.Unit.MINUTES );
|
|
|
+ private static final TimeDuration DEBUG_OUTPUT_FREQUENCY = TimeDuration.SECONDS_30;
|
|
|
|
|
|
// words tarting with this prefix are ignored.
|
|
|
private static final String COMMENT_PREFIX = "!#comment:";
|
|
|
|
|
|
private static final NumberFormat PERCENT_FORMAT = DecimalFormat.getPercentInstance();
|
|
|
|
|
|
- private final ZipReader zipFileReader;
|
|
|
+ private final WordlistZipReader zipFileReader;
|
|
|
private final StoredWordlistDataBean.Source source;
|
|
|
|
|
|
- private volatile boolean running;
|
|
|
- private volatile boolean abortFlag;
|
|
|
+ private final AtomicBoolean running = new AtomicBoolean( false );
|
|
|
+ private final AtomicBoolean abortFlag = new AtomicBoolean( false );
|
|
|
|
|
|
- private final PopulationStats overallStats = new PopulationStats();
|
|
|
- private PopulationStats perReportStats = new PopulationStats();
|
|
|
private TransactionSizeCalculator transactionCalculator = new TransactionSizeCalculator(
|
|
|
new TransactionSizeCalculator.SettingsBuilder()
|
|
|
.setDurationGoal( TimeDuration.of( 600, TimeDuration.Unit.MILLISECONDS ) )
|
|
@@ -76,16 +81,21 @@ class Populator
|
|
|
.createSettings()
|
|
|
);
|
|
|
|
|
|
- private int loopLines;
|
|
|
-
|
|
|
private final Map<String, String> bufferedWords = new TreeMap<>();
|
|
|
|
|
|
private final LocalDB localDB;
|
|
|
|
|
|
- private final ChecksumInputStream checksumInputStream;
|
|
|
-
|
|
|
private final AbstractWordlist rootWordlist;
|
|
|
|
|
|
+ private final StoredWordlistDataBean.RemoteWordlistInfo remoteWordlistInfo;
|
|
|
+
|
|
|
+ private final Instant startTime = Instant.now();
|
|
|
+
|
|
|
+ private final AtomicLong bytesSkipped = new AtomicLong( 0 );
|
|
|
+
|
|
|
+ private final EventRateMeter.MovingAverage byteRateMeter
|
|
|
+ = new EventRateMeter.MovingAverage( TimeDuration.of( 5, TimeDuration.Unit.MINUTES ) );
|
|
|
+
|
|
|
|
|
|
static
|
|
|
{
|
|
@@ -93,6 +103,7 @@ class Populator
|
|
|
}
|
|
|
|
|
|
Populator(
|
|
|
+ final StoredWordlistDataBean.RemoteWordlistInfo remoteWordlistInfo,
|
|
|
final InputStream inputStream,
|
|
|
final StoredWordlistDataBean.Source source,
|
|
|
final AbstractWordlist rootWordlist,
|
|
@@ -100,81 +111,137 @@ class Populator
|
|
|
)
|
|
|
throws Exception
|
|
|
{
|
|
|
+ this.remoteWordlistInfo = remoteWordlistInfo;
|
|
|
this.source = source;
|
|
|
- this.checksumInputStream = new ChecksumInputStream( AbstractWordlist.CHECKSUM_HASH_ALG, inputStream );
|
|
|
- this.zipFileReader = new ZipReader( checksumInputStream );
|
|
|
+ this.zipFileReader = new WordlistZipReader( inputStream );
|
|
|
this.localDB = pwmApplication.getLocalDB();
|
|
|
this.rootWordlist = rootWordlist;
|
|
|
}
|
|
|
|
|
|
- private void init( ) throws LocalDBException, IOException
|
|
|
+ private void init( ) throws IOException, LocalDBException
|
|
|
{
|
|
|
- if ( abortFlag )
|
|
|
+ if ( abortFlag.get() )
|
|
|
{
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- localDB.truncate( rootWordlist.getWordlistDB() );
|
|
|
+ final long previousBytesRead = rootWordlist.readMetadata().getBytes();
|
|
|
|
|
|
- if ( overallStats.getLines() > 0 )
|
|
|
+ if ( previousBytesRead == 0 )
|
|
|
{
|
|
|
- for ( int i = 0; i < overallStats.getLines(); i++ )
|
|
|
+ LOGGER.debug( "clearing stored wordlist" );
|
|
|
+ localDB.truncate( rootWordlist.getWordlistDB() );
|
|
|
+ }
|
|
|
+
|
|
|
+ if ( previousBytesRead > 0 )
|
|
|
+ {
|
|
|
+ final Instant startSkip = Instant.now();
|
|
|
+ LOGGER.debug( "skipping forward " + previousBytesRead + " bytes in stream that have been previously imported" );
|
|
|
+ while ( !abortFlag.get() && bytesSkipped.get() < previousBytesRead )
|
|
|
{
|
|
|
zipFileReader.nextLine();
|
|
|
+ bytesSkipped.set( zipFileReader.getByteCount() );
|
|
|
}
|
|
|
+ LOGGER.debug( "skipped forward " + previousBytesRead + " bytes in stream (" + TimeDuration.fromCurrent( startSkip ).asCompactString() + ")" );
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public String makeStatString( )
|
|
|
+ String makeStatString( )
|
|
|
{
|
|
|
- if ( !running )
|
|
|
+ if ( !running.get() )
|
|
|
{
|
|
|
return "not running";
|
|
|
}
|
|
|
|
|
|
- final int lps = perReportStats.getElapsedSeconds() <= 0 ? 0 : perReportStats.getLines() / perReportStats.getElapsedSeconds();
|
|
|
+ return rootWordlist.debugLabel + " " + StringUtil.mapToString( makeStatValues() );
|
|
|
+ }
|
|
|
+
|
|
|
+ private Map<String, String> makeStatValues()
|
|
|
+ {
|
|
|
+ final Map<String, String> stats = new LinkedHashMap<>();
|
|
|
+ stats.put( "LinesRead", Long.toString( zipFileReader.getLineCount() ) );
|
|
|
+ stats.put( "BytesRead", Long.toString( zipFileReader.getByteCount() ) );
|
|
|
+ stats.put( "BufferSize", Integer.toString( transactionCalculator.getTransactionSize() ) );
|
|
|
+
|
|
|
+ final long elapsedSeconds = TimeDuration.fromCurrent( startTime ).as( TimeDuration.Unit.SECONDS );
|
|
|
+
|
|
|
+ if ( bytesSkipped.get() > 0 )
|
|
|
+ {
|
|
|
+ stats.put( "BytesSkipped", Long.toString( bytesSkipped.get() ) );
|
|
|
+ }
|
|
|
+
|
|
|
+ if ( elapsedSeconds > 10 )
|
|
|
+ {
|
|
|
+ stats.put( "BytesPerSecond", Double.toString( byteRateMeter.getAverage() * 1000 ) );
|
|
|
+ }
|
|
|
|
|
|
- perReportStats = new PopulationStats();
|
|
|
- return rootWordlist.debugLabel + ", lines/second="
|
|
|
- + lps + ", line=" + overallStats.getLines() + ""
|
|
|
- + " current zipEntry=" + zipFileReader.currentZipName();
|
|
|
+ if ( remoteWordlistInfo != null )
|
|
|
+ {
|
|
|
+ final Percent percent = new Percent( zipFileReader.getByteCount(), remoteWordlistInfo.getBytes() );
|
|
|
+ stats.put( "PercentComplete", percent.pretty( 2 ) );
|
|
|
+ }
|
|
|
+
|
|
|
+ stats.put( "ImportTime", TimeDuration.fromCurrent( startTime ).asCompactString() );
|
|
|
+
|
|
|
+ if ( remoteWordlistInfo != null && zipFileReader.getByteCount() > 1000 )
|
|
|
+ {
|
|
|
+ final long totalBytes = remoteWordlistInfo.getBytes();
|
|
|
+ final long remaingingBytes = totalBytes - zipFileReader.getByteCount();
|
|
|
+ final long remainingSeconds = (long) ( remaingingBytes * byteRateMeter.getAverage() * 1000 );
|
|
|
+
|
|
|
+ stats.put( "EstimatedRemainingTime", TimeDuration.of( remainingSeconds, TimeDuration.Unit.SECONDS ).asCompactString() );
|
|
|
+ }
|
|
|
+
|
|
|
+ return Collections.unmodifiableMap( stats );
|
|
|
}
|
|
|
|
|
|
@SuppressWarnings( "checkstyle:InnerAssignment" )
|
|
|
void populate( ) throws IOException, LocalDBException, PwmUnrecoverableException
|
|
|
{
|
|
|
+ final ConditionalTaskExecutor metaUpdater = new ConditionalTaskExecutor(
|
|
|
+ () -> rootWordlist.writeMetadata( StoredWordlistDataBean.builder()
|
|
|
+ .source( source )
|
|
|
+ .size( rootWordlist.size() )
|
|
|
+ .storeDate( Instant.now() )
|
|
|
+ .remoteInfo( remoteWordlistInfo )
|
|
|
+ .bytes( zipFileReader.getByteCount() )
|
|
|
+ .build() ),
|
|
|
+ new ConditionalTaskExecutor.TimeDurationPredicate( TimeDuration.SECONDS_10 )
|
|
|
+ );
|
|
|
+
|
|
|
+ final ConditionalTaskExecutor debugOutputter = new ConditionalTaskExecutor(
|
|
|
+ () -> LOGGER.debug( makeStatString() ),
|
|
|
+ new ConditionalTaskExecutor.TimeDurationPredicate( DEBUG_OUTPUT_FREQUENCY )
|
|
|
+ );
|
|
|
+
|
|
|
try
|
|
|
{
|
|
|
- rootWordlist.writeMetadata( StoredWordlistDataBean.builder().source( source ).build() );
|
|
|
- running = true;
|
|
|
+ debugOutputter.conditionallyExecuteTask();
|
|
|
+ running.set( true );
|
|
|
init();
|
|
|
|
|
|
- long lastReportTime = System.currentTimeMillis() - ( long ) ( DEBUG_OUTPUT_FREQUENCY.asMillis() * 0.33 );
|
|
|
-
|
|
|
String line;
|
|
|
|
|
|
- while ( !abortFlag && ( line = zipFileReader.nextLine() ) != null )
|
|
|
- {
|
|
|
-
|
|
|
- overallStats.incrementLines();
|
|
|
- perReportStats.incrementLines();
|
|
|
+ long lastBytes = zipFileReader.getByteCount();
|
|
|
|
|
|
+ while ( !abortFlag.get() && ( line = zipFileReader.nextLine() ) != null )
|
|
|
+ {
|
|
|
addLine( line );
|
|
|
- loopLines++;
|
|
|
|
|
|
- if ( TimeDuration.fromCurrent( lastReportTime ).isLongerThan( DEBUG_OUTPUT_FREQUENCY.asMillis() ) )
|
|
|
- {
|
|
|
- LOGGER.info( makeStatString() );
|
|
|
- lastReportTime = System.currentTimeMillis();
|
|
|
- }
|
|
|
+ debugOutputter.conditionallyExecuteTask();
|
|
|
+ final long cycleBytes = zipFileReader.getByteCount() - lastBytes;
|
|
|
+ lastBytes = zipFileReader.getByteCount();
|
|
|
+ byteRateMeter.update( cycleBytes );
|
|
|
|
|
|
if ( bufferedWords.size() > transactionCalculator.getTransactionSize() )
|
|
|
{
|
|
|
flushBuffer();
|
|
|
+ metaUpdater.conditionallyExecuteTask();
|
|
|
}
|
|
|
+
|
|
|
}
|
|
|
|
|
|
- if ( abortFlag )
|
|
|
+ if ( abortFlag.get() )
|
|
|
{
|
|
|
LOGGER.warn( "pausing " + rootWordlist.debugLabel + " population" );
|
|
|
}
|
|
@@ -185,8 +252,8 @@ class Populator
|
|
|
}
|
|
|
finally
|
|
|
{
|
|
|
- running = false;
|
|
|
- IOUtils.closeQuietly( checksumInputStream );
|
|
|
+ running.set( false );
|
|
|
+ IOUtils.closeQuietly( zipFileReader );
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -218,7 +285,7 @@ class Populator
|
|
|
//add the elements
|
|
|
localDB.putAll( rootWordlist.getWordlistDB(), bufferedWords );
|
|
|
|
|
|
- if ( abortFlag )
|
|
|
+ if ( abortFlag.get() )
|
|
|
{
|
|
|
return;
|
|
|
}
|
|
@@ -227,21 +294,8 @@ class Populator
|
|
|
final long commitTime = System.currentTimeMillis() - startTime;
|
|
|
transactionCalculator.recordLastTransactionDuration( commitTime );
|
|
|
|
|
|
- if ( bufferedWords.size() > 0 )
|
|
|
- {
|
|
|
- final StringBuilder sb = new StringBuilder();
|
|
|
- sb.append( rootWordlist.debugLabel ).append( " " );
|
|
|
- sb.append( "read " ).append( loopLines ).append( ", " );
|
|
|
- sb.append( "saved " );
|
|
|
- sb.append( bufferedWords.size() ).append( " words" );
|
|
|
- sb.append( " (" ).append( TimeDuration.of( commitTime, TimeDuration.Unit.MILLISECONDS ).asCompactString() ).append( ")" );
|
|
|
-
|
|
|
- LOGGER.trace( sb.toString() );
|
|
|
- }
|
|
|
-
|
|
|
//clear the buffers.
|
|
|
bufferedWords.clear();
|
|
|
- loopLines = 0;
|
|
|
}
|
|
|
|
|
|
private void populationComplete( )
|
|
@@ -259,14 +313,15 @@ class Populator
|
|
|
final StringBuilder sb = new StringBuilder();
|
|
|
sb.append( rootWordlist.debugLabel );
|
|
|
sb.append( " population complete, added " ).append( wordlistSize );
|
|
|
- sb.append( " total words in " ).append( TimeDuration.of( overallStats.getElapsedSeconds() * 1000, TimeDuration.Unit.MILLISECONDS ).asCompactString() );
|
|
|
+ sb.append( " total words in " ).append( TimeDuration.fromCurrent( startTime ).asCompactString() );
|
|
|
{
|
|
|
final StoredWordlistDataBean storedWordlistDataBean = StoredWordlistDataBean.builder()
|
|
|
- .sha1hash( JavaHelper.binaryArrayToHex( checksumInputStream.closeAndFinalChecksum() ) )
|
|
|
+ .remoteInfo( remoteWordlistInfo )
|
|
|
.size( wordlistSize )
|
|
|
.storeDate( Instant.now() )
|
|
|
.source( source )
|
|
|
- .completed( !abortFlag )
|
|
|
+ .completed( !abortFlag.get() )
|
|
|
+ .bytes( zipFileReader.getByteCount() )
|
|
|
.build();
|
|
|
rootWordlist.writeMetadata( storedWordlistDataBean );
|
|
|
}
|
|
@@ -276,7 +331,7 @@ class Populator
|
|
|
public void cancel( ) throws PwmUnrecoverableException
|
|
|
{
|
|
|
LOGGER.debug( "cancelling in-progress population" );
|
|
|
- abortFlag = true;
|
|
|
+ abortFlag.set( true );
|
|
|
|
|
|
final int maxWaitMs = 1000 * 30;
|
|
|
final Instant startWaitTime = Instant.now();
|
|
@@ -293,28 +348,6 @@ class Populator
|
|
|
|
|
|
public boolean isRunning( )
|
|
|
{
|
|
|
- return running;
|
|
|
- }
|
|
|
-
|
|
|
- private static class PopulationStats
|
|
|
- {
|
|
|
-
|
|
|
- private long startTime = System.currentTimeMillis();
|
|
|
- private int lines;
|
|
|
-
|
|
|
- public int getLines( )
|
|
|
- {
|
|
|
- return lines;
|
|
|
- }
|
|
|
-
|
|
|
- public void incrementLines( )
|
|
|
- {
|
|
|
- lines++;
|
|
|
- }
|
|
|
-
|
|
|
- public int getElapsedSeconds( )
|
|
|
- {
|
|
|
- return ( int ) ( System.currentTimeMillis() - startTime ) / 1000;
|
|
|
- }
|
|
|
+ return running.get();
|
|
|
}
|
|
|
}
|