|
@@ -26,7 +26,6 @@ import password.pwm.util.java.JavaHelper;
|
|
import password.pwm.util.java.TimeDuration;
|
|
import password.pwm.util.java.TimeDuration;
|
|
import password.pwm.util.logging.PwmLogger;
|
|
import password.pwm.util.logging.PwmLogger;
|
|
|
|
|
|
-import java.math.BigInteger;
|
|
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
import java.util.Collection;
|
|
import java.util.Collection;
|
|
import java.util.Collections;
|
|
import java.util.Collections;
|
|
@@ -36,6 +35,7 @@ import java.util.Iterator;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
import java.util.NoSuchElementException;
|
|
import java.util.NoSuchElementException;
|
|
|
|
+import java.util.Objects;
|
|
import java.util.Queue;
|
|
import java.util.Queue;
|
|
import java.util.Set;
|
|
import java.util.Set;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
@@ -47,8 +47,7 @@ import java.util.function.Supplier;
|
|
* A LIFO {@link Queue} implementation backed by a localDB instance. {@code this} instances are internally
|
|
* A LIFO {@link Queue} implementation backed by a localDB instance. {@code this} instances are internally
|
|
* synchronized.
|
|
* synchronized.
|
|
*/
|
|
*/
|
|
-public class
|
|
|
|
-LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
|
|
|
|
+public class LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
{
|
|
{
|
|
private static final PwmLogger LOGGER = PwmLogger.forClass( LocalDBStoredQueue.class, true );
|
|
private static final PwmLogger LOGGER = PwmLogger.forClass( LocalDBStoredQueue.class, true );
|
|
private static final int MAX_SIZE = Integer.MAX_VALUE - 3;
|
|
private static final int MAX_SIZE = Integer.MAX_VALUE - 3;
|
|
@@ -142,11 +141,7 @@ LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
@Override
|
|
@Override
|
|
public Object[] toArray( )
|
|
public Object[] toArray( )
|
|
{
|
|
{
|
|
- final List<Object> returnList = new ArrayList<>();
|
|
|
|
- for ( final Iterator<String> innerIter = this.iterator(); innerIter.hasNext(); )
|
|
|
|
- {
|
|
|
|
- returnList.add( innerIter.next() );
|
|
|
|
- }
|
|
|
|
|
|
+ final List<Object> returnList = new ArrayList<>( this );
|
|
return returnList.toArray();
|
|
return returnList.toArray();
|
|
}
|
|
}
|
|
|
|
|
|
@@ -154,9 +149,9 @@ LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
public <T> T[] toArray( final T[] a )
|
|
public <T> T[] toArray( final T[] a )
|
|
{
|
|
{
|
|
int index = 0;
|
|
int index = 0;
|
|
- for ( final Iterator<String> innerIter = this.iterator(); innerIter.hasNext(); )
|
|
|
|
|
|
+ for ( final String s : this )
|
|
{
|
|
{
|
|
- a[ index ] = ( T ) innerIter.next();
|
|
|
|
|
|
+ a[index] = ( T ) s;
|
|
index++;
|
|
index++;
|
|
}
|
|
}
|
|
return a;
|
|
return a;
|
|
@@ -246,7 +241,8 @@ LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
{
|
|
{
|
|
try
|
|
try
|
|
{
|
|
{
|
|
- return internalQueue.size();
|
|
|
|
|
|
+ final long realSize = internalQueue.size();
|
|
|
|
+ return realSize >= Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) realSize;
|
|
}
|
|
}
|
|
catch ( final LocalDBException e )
|
|
catch ( final LocalDBException e )
|
|
{
|
|
{
|
|
@@ -526,7 +522,7 @@ LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
private Position position;
|
|
private Position position;
|
|
private final InternalQueue internalQueue;
|
|
private final InternalQueue internalQueue;
|
|
private final boolean first;
|
|
private final boolean first;
|
|
- private int queueSizeAtCreate;
|
|
|
|
|
|
+ private final long queueSizeAtCreate;
|
|
private int steps;
|
|
private int steps;
|
|
|
|
|
|
|
|
|
|
@@ -555,7 +551,7 @@ LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
steps++;
|
|
steps++;
|
|
try
|
|
try
|
|
{
|
|
{
|
|
- final String nextValue = internalQueue.localDB.get( internalQueue.db, position.toString() );
|
|
|
|
|
|
+ final String nextValue = internalQueue.localDB.get( internalQueue.db, position.key() );
|
|
if ( first )
|
|
if ( first )
|
|
{
|
|
{
|
|
position = position.equals( internalQueue.tailPosition ) ? null : position.previous();
|
|
position = position.equals( internalQueue.tailPosition ) ? null : position.previous();
|
|
@@ -582,28 +578,28 @@ LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private static class Position
|
|
|
|
|
|
+ static class Position
|
|
{
|
|
{
|
|
private static final int RADIX = 36;
|
|
private static final int RADIX = 36;
|
|
- private static final BigInteger MAXIMUM_POSITION = new BigInteger( "zzzzzz", RADIX );
|
|
|
|
- private static final BigInteger MINIMUM_POSITION = BigInteger.ZERO;
|
|
|
|
|
|
+ private static final long MAXIMUM_POSITION = Long.parseLong( "zzzzzz", RADIX );
|
|
|
|
+ private static final long MINIMUM_POSITION = 0;
|
|
|
|
|
|
- private final BigInteger bigInt;
|
|
|
|
|
|
+ private final long bigInt;
|
|
|
|
|
|
- private Position( final BigInteger bigInt )
|
|
|
|
|
|
+ private Position( final long bigInt )
|
|
{
|
|
{
|
|
this.bigInt = bigInt;
|
|
this.bigInt = bigInt;
|
|
}
|
|
}
|
|
|
|
|
|
Position( final String bigInt )
|
|
Position( final String bigInt )
|
|
{
|
|
{
|
|
- this.bigInt = new BigInteger( bigInt, RADIX );
|
|
|
|
|
|
+ this.bigInt = Long.parseLong( bigInt, RADIX );
|
|
}
|
|
}
|
|
|
|
|
|
public Position next( )
|
|
public Position next( )
|
|
{
|
|
{
|
|
- BigInteger next = bigInt.add( BigInteger.ONE );
|
|
|
|
- if ( next.compareTo( MAXIMUM_POSITION ) > 0 )
|
|
|
|
|
|
+ long next = bigInt + 1;
|
|
|
|
+ if ( next > MAXIMUM_POSITION )
|
|
{
|
|
{
|
|
next = MINIMUM_POSITION;
|
|
next = MINIMUM_POSITION;
|
|
}
|
|
}
|
|
@@ -612,38 +608,43 @@ LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
|
|
|
|
public Position previous( )
|
|
public Position previous( )
|
|
{
|
|
{
|
|
- BigInteger previous = bigInt.subtract( BigInteger.ONE );
|
|
|
|
- if ( previous.compareTo( MINIMUM_POSITION ) < 0 )
|
|
|
|
|
|
+ long previous = bigInt - 1;
|
|
|
|
+ if ( previous < MINIMUM_POSITION )
|
|
{
|
|
{
|
|
previous = MAXIMUM_POSITION;
|
|
previous = MAXIMUM_POSITION;
|
|
}
|
|
}
|
|
return new Position( previous );
|
|
return new Position( previous );
|
|
}
|
|
}
|
|
|
|
|
|
- public BigInteger distanceToHead( final Position head )
|
|
|
|
|
|
+ public long distanceToHead( final Position head )
|
|
{
|
|
{
|
|
- final int compareToValue = head.bigInt.compareTo( this.bigInt );
|
|
|
|
|
|
+ final int compareToValue = Long.compare( head.bigInt, this.bigInt );
|
|
if ( compareToValue == 0 )
|
|
if ( compareToValue == 0 )
|
|
{
|
|
{
|
|
- return BigInteger.ZERO;
|
|
|
|
|
|
+ return 0;
|
|
}
|
|
}
|
|
else if ( compareToValue == 1 )
|
|
else if ( compareToValue == 1 )
|
|
{
|
|
{
|
|
- return head.bigInt.subtract( this.bigInt );
|
|
|
|
|
|
+ return head.bigInt - this.bigInt;
|
|
}
|
|
}
|
|
|
|
|
|
- final BigInteger tailToMax = MAXIMUM_POSITION.subtract( this.bigInt );
|
|
|
|
- final BigInteger minToHead = head.bigInt.subtract( MINIMUM_POSITION );
|
|
|
|
- return minToHead.add( tailToMax ).add( BigInteger.ONE );
|
|
|
|
|
|
+ final long tailToMax = MAXIMUM_POSITION - this.bigInt;
|
|
|
|
+ final long minToHead = head.bigInt - MINIMUM_POSITION;
|
|
|
|
+ return minToHead + tailToMax + 1;
|
|
}
|
|
}
|
|
|
|
|
|
public String toString( )
|
|
public String toString( )
|
|
|
|
+ {
|
|
|
|
+ return key();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public String key()
|
|
{
|
|
{
|
|
final StringBuilder sb = new StringBuilder();
|
|
final StringBuilder sb = new StringBuilder();
|
|
- sb.append( bigInt.toString( RADIX ).toUpperCase() );
|
|
|
|
|
|
+ sb.append( Long.toString( bigInt, RADIX ).toUpperCase() );
|
|
while ( sb.length() < 6 )
|
|
while ( sb.length() < 6 )
|
|
{
|
|
{
|
|
- sb.insert( 0, "0" );
|
|
|
|
|
|
+ sb.insert( 0, '0' );
|
|
}
|
|
}
|
|
return sb.toString();
|
|
return sb.toString();
|
|
}
|
|
}
|
|
@@ -659,16 +660,14 @@ LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
{
|
|
{
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
-
|
|
|
|
final Position position = ( Position ) o;
|
|
final Position position = ( Position ) o;
|
|
-
|
|
|
|
- return bigInt.equals( position.bigInt );
|
|
|
|
|
|
+ return bigInt == position.bigInt;
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
- public int hashCode( )
|
|
|
|
|
|
+ public int hashCode()
|
|
{
|
|
{
|
|
- return bigInt.hashCode();
|
|
|
|
|
|
+ return Objects.hash( bigInt );
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -734,10 +733,12 @@ LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
tailPosition = tailPositionStr != null && tailPositionStr.length() > 0 ? new Position( tailPositionStr ) : new Position( "0" );
|
|
tailPosition = tailPositionStr != null && tailPositionStr.length() > 0 ? new Position( tailPositionStr ) : new Position( "0" );
|
|
|
|
|
|
{
|
|
{
|
|
- final int finalSize = this.size();
|
|
|
|
|
|
+ final long finalSize = this.size();
|
|
LOGGER.trace( () -> "loaded for db " + db + "; headPosition=" + headPosition + ", tailPosition=" + tailPosition + ", size=" + finalSize );
|
|
LOGGER.trace( () -> "loaded for db " + db + "; headPosition=" + headPosition + ", tailPosition=" + tailPosition + ", size=" + finalSize );
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ checkSize();
|
|
|
|
+
|
|
repair();
|
|
repair();
|
|
|
|
|
|
debugOutput( "post init()" );
|
|
debugOutput( "post init()" );
|
|
@@ -778,7 +779,7 @@ LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- public int size( )
|
|
|
|
|
|
+ public long size( )
|
|
throws LocalDBException
|
|
throws LocalDBException
|
|
{
|
|
{
|
|
lock.readLock().lock();
|
|
lock.readLock().lock();
|
|
@@ -792,14 +793,14 @@ LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private int internalSize( )
|
|
|
|
|
|
+ private long internalSize( )
|
|
throws LocalDBException
|
|
throws LocalDBException
|
|
{
|
|
{
|
|
if ( headPosition.equals( tailPosition ) && localDB.get( db, headPosition.toString() ) == null )
|
|
if ( headPosition.equals( tailPosition ) && localDB.get( db, headPosition.toString() ) == null )
|
|
{
|
|
{
|
|
return 0;
|
|
return 0;
|
|
}
|
|
}
|
|
- return tailPosition.distanceToHead( headPosition ).intValue() + 1;
|
|
|
|
|
|
+ return tailPosition.distanceToHead( headPosition ) + 1;
|
|
}
|
|
}
|
|
|
|
|
|
List<String> removeFirst( final int removalCount, final boolean returnValues ) throws LocalDBException
|
|
List<String> removeFirst( final int removalCount, final boolean returnValues ) throws LocalDBException
|
|
@@ -808,36 +809,9 @@ LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
try
|
|
try
|
|
{
|
|
{
|
|
debugOutput( "pre removeFirst()" );
|
|
debugOutput( "pre removeFirst()" );
|
|
-
|
|
|
|
- if ( removalCount < 1 )
|
|
|
|
- {
|
|
|
|
- return Collections.emptyList();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- final List<String> removalKeys = new ArrayList<>();
|
|
|
|
- final List<String> removedValues = new ArrayList<>();
|
|
|
|
- Position previousHead = headPosition;
|
|
|
|
- int removedPositions = 0;
|
|
|
|
- while ( removedPositions < removalCount )
|
|
|
|
- {
|
|
|
|
- removalKeys.add( previousHead.toString() );
|
|
|
|
- if ( returnValues )
|
|
|
|
- {
|
|
|
|
- final String loopValue = localDB.get( db, previousHead.toString() );
|
|
|
|
- if ( loopValue != null )
|
|
|
|
- {
|
|
|
|
- removedValues.add( loopValue );
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- previousHead = previousHead.equals( tailPosition ) ? previousHead : previousHead.previous();
|
|
|
|
- removedPositions++;
|
|
|
|
- }
|
|
|
|
- localDB.removeAll( db, removalKeys );
|
|
|
|
- localDB.put( db, KEY_HEAD_POSITION, previousHead.toString() );
|
|
|
|
- headPosition = previousHead;
|
|
|
|
-
|
|
|
|
|
|
+ final List<String> removedValues = removeImpl( removalCount, returnValues, true );
|
|
debugOutput( "post removeFirst()" );
|
|
debugOutput( "post removeFirst()" );
|
|
- return Collections.unmodifiableList( removedValues );
|
|
|
|
|
|
+ return removedValues;
|
|
}
|
|
}
|
|
finally
|
|
finally
|
|
{
|
|
{
|
|
@@ -851,36 +825,9 @@ LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
try
|
|
try
|
|
{
|
|
{
|
|
debugOutput( "pre removeLast()" );
|
|
debugOutput( "pre removeLast()" );
|
|
-
|
|
|
|
- if ( removalCount < 1 )
|
|
|
|
- {
|
|
|
|
- return Collections.emptyList();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- final List<String> removalKeys = new ArrayList<>();
|
|
|
|
- final List<String> removedValues = new ArrayList<>();
|
|
|
|
- Position nextTail = tailPosition;
|
|
|
|
- int removedPositions = 0;
|
|
|
|
- while ( removedPositions < removalCount )
|
|
|
|
- {
|
|
|
|
- removalKeys.add( nextTail.toString() );
|
|
|
|
- if ( returnValues )
|
|
|
|
- {
|
|
|
|
- final String loopValue = localDB.get( db, nextTail.toString() );
|
|
|
|
- if ( loopValue != null )
|
|
|
|
- {
|
|
|
|
- removedValues.add( loopValue );
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- nextTail = nextTail.equals( headPosition ) ? nextTail : nextTail.next();
|
|
|
|
- removedPositions++;
|
|
|
|
- }
|
|
|
|
- localDB.removeAll( db, removalKeys );
|
|
|
|
- localDB.put( db, KEY_TAIL_POSITION, nextTail.toString() );
|
|
|
|
- tailPosition = nextTail;
|
|
|
|
-
|
|
|
|
|
|
+ final List<String> removedValues = removeImpl( removalCount, returnValues, false );
|
|
debugOutput( "post removeLast()" );
|
|
debugOutput( "post removeLast()" );
|
|
- return Collections.unmodifiableList( removedValues );
|
|
|
|
|
|
+ return removedValues;
|
|
}
|
|
}
|
|
finally
|
|
finally
|
|
{
|
|
{
|
|
@@ -888,43 +835,64 @@ LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- void addFirst( final Collection<String> values )
|
|
|
|
|
|
+ private List<String> removeImpl( final int removalCount, final boolean returnValues, final boolean forward )
|
|
throws LocalDBException
|
|
throws LocalDBException
|
|
{
|
|
{
|
|
- lock.writeLock().lock();
|
|
|
|
- try
|
|
|
|
|
|
+ if ( removalCount < 1 )
|
|
{
|
|
{
|
|
- debugOutput( "pre addFirst()" );
|
|
|
|
- if ( JavaHelper.isEmpty( values ) )
|
|
|
|
- {
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
|
|
+ return Collections.emptyList();
|
|
|
|
+ }
|
|
|
|
|
|
- if ( internalSize() + values.size() > MAX_SIZE )
|
|
|
|
|
|
+ final List<String> removalKeys = new ArrayList<>();
|
|
|
|
+ final List<String> removedValues = new ArrayList<>();
|
|
|
|
+ Position loopPosition = forward ? headPosition : tailPosition;
|
|
|
|
+ int removedPositions = 0;
|
|
|
|
+ while ( removedPositions < removalCount )
|
|
|
|
+ {
|
|
|
|
+ removalKeys.add( loopPosition.key() );
|
|
|
|
+ if ( returnValues )
|
|
{
|
|
{
|
|
- throw new IllegalStateException( "queue overflow" );
|
|
|
|
|
|
+ final String loopValue = localDB.get( db, loopPosition.key() );
|
|
|
|
+ if ( loopValue != null )
|
|
|
|
+ {
|
|
|
|
+ removedValues.add( loopValue );
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
- final Iterator<String> valueIterator = values.iterator();
|
|
|
|
-
|
|
|
|
- final Map<String, String> keyValueMap = new HashMap<>();
|
|
|
|
- Position nextHead = headPosition;
|
|
|
|
-
|
|
|
|
- if ( internalSize() == 0 )
|
|
|
|
|
|
+ if ( forward )
|
|
{
|
|
{
|
|
- keyValueMap.put( nextHead.toString(), valueIterator.next() );
|
|
|
|
|
|
+ loopPosition = loopPosition.equals( tailPosition ) ? loopPosition : loopPosition.previous();
|
|
}
|
|
}
|
|
-
|
|
|
|
- while ( valueIterator.hasNext() )
|
|
|
|
|
|
+ else
|
|
{
|
|
{
|
|
- nextHead = nextHead.next();
|
|
|
|
- keyValueMap.put( nextHead.toString(), valueIterator.next() );
|
|
|
|
|
|
+ loopPosition = loopPosition.equals( headPosition ) ? loopPosition : loopPosition.next();
|
|
}
|
|
}
|
|
|
|
|
|
- keyValueMap.put( KEY_HEAD_POSITION, String.valueOf( nextHead ) );
|
|
|
|
- localDB.putAll( db, keyValueMap );
|
|
|
|
- headPosition = nextHead;
|
|
|
|
|
|
+ removedPositions++;
|
|
|
|
+ }
|
|
|
|
+ localDB.removeAll( db, removalKeys );
|
|
|
|
+ localDB.put( db, forward ? KEY_HEAD_POSITION : KEY_TAIL_POSITION, loopPosition.key() );
|
|
|
|
+
|
|
|
|
+ if ( forward )
|
|
|
|
+ {
|
|
|
|
+ headPosition = loopPosition;
|
|
|
|
+ }
|
|
|
|
+ else
|
|
|
|
+ {
|
|
|
|
+ tailPosition = loopPosition;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return Collections.unmodifiableList( removedValues );
|
|
|
|
+ }
|
|
|
|
|
|
|
|
+ void addFirst( final Collection<String> values )
|
|
|
|
+ throws LocalDBException
|
|
|
|
+ {
|
|
|
|
+ lock.writeLock().lock();
|
|
|
|
+ try
|
|
|
|
+ {
|
|
|
|
+ debugOutput( "pre addFirst()" );
|
|
|
|
+ addImpl( values, true );
|
|
debugOutput( "post addFirst()" );
|
|
debugOutput( "post addFirst()" );
|
|
}
|
|
}
|
|
finally
|
|
finally
|
|
@@ -939,43 +907,59 @@ LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
try
|
|
try
|
|
{
|
|
{
|
|
debugOutput( "pre addLast()" );
|
|
debugOutput( "pre addLast()" );
|
|
- if ( JavaHelper.isEmpty( values ) )
|
|
|
|
- {
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
|
|
+ addImpl( values, false );
|
|
|
|
+ debugOutput( "post addLast()" );
|
|
|
|
+ }
|
|
|
|
+ finally
|
|
|
|
+ {
|
|
|
|
+ lock.writeLock().unlock();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
|
|
- if ( internalSize() + values.size() > MAX_SIZE )
|
|
|
|
- {
|
|
|
|
- throw new IllegalStateException( "queue overflow" );
|
|
|
|
- }
|
|
|
|
|
|
+ private void addImpl( final Collection<String> values, final boolean forward )
|
|
|
|
+ throws LocalDBException
|
|
|
|
+ {
|
|
|
|
+ if ( JavaHelper.isEmpty( values ) )
|
|
|
|
+ {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
|
|
- final Iterator<String> valueIterator = values.iterator();
|
|
|
|
|
|
+ if ( internalSize() + values.size() > MAX_SIZE )
|
|
|
|
+ {
|
|
|
|
+ throw new IllegalStateException( "queue overflow" );
|
|
|
|
+ }
|
|
|
|
|
|
- final Map<String, String> keyValueMap = new HashMap<>();
|
|
|
|
- Position nextTail = tailPosition;
|
|
|
|
|
|
+ final Iterator<String> valueIterator = values.iterator();
|
|
|
|
|
|
- if ( internalSize() == 0 )
|
|
|
|
- {
|
|
|
|
- keyValueMap.put( nextTail.toString(), valueIterator.next() );
|
|
|
|
- }
|
|
|
|
|
|
+ final Map<String, String> keyValueMap = new HashMap<>();
|
|
|
|
+ Position loopPosition = forward ? headPosition : tailPosition;
|
|
|
|
|
|
- while ( valueIterator.hasNext() )
|
|
|
|
- {
|
|
|
|
- nextTail = nextTail.previous();
|
|
|
|
- keyValueMap.put( nextTail.toString(), valueIterator.next() );
|
|
|
|
- }
|
|
|
|
- keyValueMap.put( KEY_TAIL_POSITION, String.valueOf( nextTail ) );
|
|
|
|
- localDB.putAll( db, keyValueMap );
|
|
|
|
- tailPosition = nextTail;
|
|
|
|
|
|
+ if ( internalSize() == 0 )
|
|
|
|
+ {
|
|
|
|
+ keyValueMap.put( loopPosition.toString(), valueIterator.next() );
|
|
|
|
+ }
|
|
|
|
|
|
- debugOutput( "post addLast()" );
|
|
|
|
|
|
+ while ( valueIterator.hasNext() )
|
|
|
|
+ {
|
|
|
|
+ loopPosition = forward ? loopPosition.next() : loopPosition.previous();
|
|
|
|
+ keyValueMap.put( loopPosition.key(), valueIterator.next() );
|
|
}
|
|
}
|
|
- finally
|
|
|
|
|
|
+
|
|
|
|
+ keyValueMap.put( forward ? KEY_HEAD_POSITION : KEY_TAIL_POSITION, loopPosition.key() );
|
|
|
|
+ localDB.putAll( db, keyValueMap );
|
|
|
|
+
|
|
|
|
+ if ( forward )
|
|
{
|
|
{
|
|
- lock.writeLock().unlock();
|
|
|
|
|
|
+ headPosition = loopPosition;
|
|
|
|
+ }
|
|
|
|
+ else
|
|
|
|
+ {
|
|
|
|
+ tailPosition = loopPosition;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+
|
|
|
|
+
|
|
List<String> getFirst( final int count )
|
|
List<String> getFirst( final int count )
|
|
throws LocalDBException
|
|
throws LocalDBException
|
|
{
|
|
{
|
|
@@ -983,29 +967,8 @@ LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
try
|
|
try
|
|
{
|
|
{
|
|
debugOutput( "pre getFirst()" );
|
|
debugOutput( "pre getFirst()" );
|
|
-
|
|
|
|
- int getCount = count;
|
|
|
|
- if ( getCount < 1 )
|
|
|
|
- {
|
|
|
|
- return Collections.emptyList();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if ( getCount > internalSize() )
|
|
|
|
- {
|
|
|
|
- getCount = internalSize();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- final List<String> returnList = new ArrayList<>();
|
|
|
|
-
|
|
|
|
- Position nextHead = headPosition;
|
|
|
|
- while ( returnList.size() < getCount )
|
|
|
|
- {
|
|
|
|
- returnList.add( localDB.get( db, nextHead.toString() ) );
|
|
|
|
- nextHead = nextHead.previous();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
|
|
+ final List<String> returnList = getImpl( count, true );
|
|
debugOutput( "post getFirst()" );
|
|
debugOutput( "post getFirst()" );
|
|
-
|
|
|
|
return returnList;
|
|
return returnList;
|
|
}
|
|
}
|
|
finally
|
|
finally
|
|
@@ -1021,29 +984,8 @@ LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
try
|
|
try
|
|
{
|
|
{
|
|
debugOutput( "pre getLast()" );
|
|
debugOutput( "pre getLast()" );
|
|
-
|
|
|
|
- int getCount = count;
|
|
|
|
- if ( getCount < 1 )
|
|
|
|
- {
|
|
|
|
- return Collections.emptyList();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if ( getCount > internalSize() )
|
|
|
|
- {
|
|
|
|
- getCount = internalSize();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- final List<String> returnList = new ArrayList<>();
|
|
|
|
-
|
|
|
|
- Position nextTail = tailPosition;
|
|
|
|
- while ( returnList.size() < getCount )
|
|
|
|
- {
|
|
|
|
- returnList.add( localDB.get( db, nextTail.toString() ) );
|
|
|
|
- nextTail = nextTail.next();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
|
|
+ final List<String> returnList = getImpl( count, false );
|
|
debugOutput( "post getLast()" );
|
|
debugOutput( "post getLast()" );
|
|
-
|
|
|
|
return returnList;
|
|
return returnList;
|
|
}
|
|
}
|
|
finally
|
|
finally
|
|
@@ -1052,6 +994,32 @@ LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private List<String> getImpl( final long count, final boolean forward )
|
|
|
|
+ throws LocalDBException
|
|
|
|
+ {
|
|
|
|
+ long getCount = count;
|
|
|
|
+ if ( getCount < 1 )
|
|
|
|
+ {
|
|
|
|
+ return Collections.emptyList();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if ( getCount > internalSize() )
|
|
|
|
+ {
|
|
|
|
+ getCount = internalSize();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ final List<String> returnList = new ArrayList<>();
|
|
|
|
+
|
|
|
|
+ Position nextPosition = forward ? headPosition : tailPosition;
|
|
|
|
+ while ( returnList.size() < getCount )
|
|
|
|
+ {
|
|
|
|
+ returnList.add( localDB.get( db, nextPosition.key() ) );
|
|
|
|
+ nextPosition = forward ? nextPosition.previous() : nextPosition.next();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return Collections.unmodifiableList( returnList );
|
|
|
|
+ }
|
|
|
|
+
|
|
void debugOutput( final String input )
|
|
void debugOutput( final String input )
|
|
{
|
|
{
|
|
if ( !developerDebug || DEBUG_IGNORED_DB.contains( db ) )
|
|
if ( !developerDebug || DEBUG_IGNORED_DB.contains( db ) )
|
|
@@ -1095,78 +1063,93 @@ LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
LOGGER.trace( debugOutput );
|
|
LOGGER.trace( debugOutput );
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private void checkSize()
|
|
|
|
+ throws LocalDBException
|
|
|
|
+ {
|
|
|
|
+ final long dbSize = localDB.size( db );
|
|
|
|
+ final long positionDistance = tailPosition.distanceToHead( headPosition );
|
|
|
|
+
|
|
|
|
+ // +3 for header/tail position and version keys.
|
|
|
|
+ if ( dbSize != positionDistance + 3 )
|
|
|
|
+ {
|
|
|
|
+ LOGGER.warn( () -> "dbSize=" + dbSize + " and positionDistance=" + positionDistance + " stored Queue is corrupted" );
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
private void repair( ) throws LocalDBException
|
|
private void repair( ) throws LocalDBException
|
|
{
|
|
{
|
|
- int headTrim = 0;
|
|
|
|
- int tailTrim = 0;
|
|
|
|
|
|
+ lock.writeLock().lock();
|
|
|
|
+ try
|
|
|
|
+ {
|
|
|
|
+ int headTrim = 0;
|
|
|
|
+ int tailTrim = 0;
|
|
|
|
|
|
- debugOutput( "pre repair()" );
|
|
|
|
|
|
+ debugOutput( "pre repair()" );
|
|
|
|
|
|
- final AtomicInteger examinedRecords = new AtomicInteger( 0 );
|
|
|
|
|
|
+ final AtomicInteger examinedRecords = new AtomicInteger( 0 );
|
|
|
|
|
|
- final ConditionalTaskExecutor conditionalTaskExecutor = new ConditionalTaskExecutor(
|
|
|
|
- new Runnable()
|
|
|
|
|
|
+ final ConditionalTaskExecutor conditionalTaskExecutor = new ConditionalTaskExecutor( () ->
|
|
|
|
+ {
|
|
|
|
+ try
|
|
{
|
|
{
|
|
- @Override
|
|
|
|
- public void run( )
|
|
|
|
- {
|
|
|
|
- try
|
|
|
|
- {
|
|
|
|
- localDB.put( db, KEY_HEAD_POSITION, headPosition.toString() );
|
|
|
|
- localDB.put( db, KEY_TAIL_POSITION, tailPosition.toString() );
|
|
|
|
- final int dbSize = size();
|
|
|
|
- LOGGER.debug( () -> "repairing db " + db + ", " + examinedRecords.get() + " records examined"
|
|
|
|
- + ", size=" + dbSize
|
|
|
|
- + ", head=" + headPosition.toString() + ", tail=" + tailPosition.toString() );
|
|
|
|
- }
|
|
|
|
- catch ( final Exception e )
|
|
|
|
- {
|
|
|
|
- LOGGER.error( () -> "unexpected error during output of debug message during stored queue repair operation: " + e.getMessage(), e );
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- },
|
|
|
|
- new ConditionalTaskExecutor.TimeDurationPredicate( 30, TimeDuration.Unit.SECONDS )
|
|
|
|
- );
|
|
|
|
-
|
|
|
|
- // trim the top.
|
|
|
|
- while ( !headPosition.equals( tailPosition ) && localDB.get( db, headPosition.toString() ) == null )
|
|
|
|
- {
|
|
|
|
- examinedRecords.incrementAndGet();
|
|
|
|
- conditionalTaskExecutor.conditionallyExecuteTask();
|
|
|
|
- headPosition = headPosition.previous();
|
|
|
|
- headTrim++;
|
|
|
|
- }
|
|
|
|
- localDB.put( db, KEY_HEAD_POSITION, headPosition.toString() );
|
|
|
|
|
|
+ localDB.put( db, KEY_HEAD_POSITION, headPosition.key() );
|
|
|
|
+ localDB.put( db, KEY_TAIL_POSITION, tailPosition.key() );
|
|
|
|
+ final long dbSize = size();
|
|
|
|
+ LOGGER.debug( () -> "repairing db " + db + ", " + examinedRecords.get() + " records examined"
|
|
|
|
+ + ", size=" + dbSize
|
|
|
|
+ + ", head=" + headPosition.key() + ", tail=" + tailPosition.key() );
|
|
|
|
+ }
|
|
|
|
+ catch ( final Exception e )
|
|
|
|
+ {
|
|
|
|
+ LOGGER.error( () -> "unexpected error during output of debug message during stored queue repair operation: " + e.getMessage(), e );
|
|
|
|
+ }
|
|
|
|
+ },
|
|
|
|
+ new ConditionalTaskExecutor.TimeDurationPredicate( TimeDuration.SECONDS_10 )
|
|
|
|
+ );
|
|
|
|
|
|
- // trim the bottom.
|
|
|
|
- while ( !headPosition.equals( tailPosition ) && localDB.get( db, tailPosition.toString() ) == null )
|
|
|
|
- {
|
|
|
|
- examinedRecords.incrementAndGet();
|
|
|
|
- conditionalTaskExecutor.conditionallyExecuteTask();
|
|
|
|
- tailPosition = tailPosition.next();
|
|
|
|
- tailTrim++;
|
|
|
|
- }
|
|
|
|
- localDB.put( db, KEY_TAIL_POSITION, tailPosition.toString() );
|
|
|
|
|
|
+ // trim the top.
|
|
|
|
+ while ( !headPosition.equals( tailPosition ) && localDB.get( db, headPosition.key() ) == null )
|
|
|
|
+ {
|
|
|
|
+ examinedRecords.incrementAndGet();
|
|
|
|
+ conditionalTaskExecutor.conditionallyExecuteTask();
|
|
|
|
+ headPosition = headPosition.previous();
|
|
|
|
+ headTrim++;
|
|
|
|
+ }
|
|
|
|
+ localDB.put( db, KEY_HEAD_POSITION, headPosition.key() );
|
|
|
|
|
|
- if ( tailTrim == 0 && headTrim == 0 )
|
|
|
|
- {
|
|
|
|
- LOGGER.trace( () -> "repair unnecessary for " + db );
|
|
|
|
- }
|
|
|
|
- else
|
|
|
|
- {
|
|
|
|
- if ( headTrim > 0 )
|
|
|
|
|
|
+ // trim the bottom.
|
|
|
|
+ while ( !headPosition.equals( tailPosition ) && localDB.get( db, tailPosition.toString() ) == null )
|
|
{
|
|
{
|
|
- final int headTrimFinal = headTrim;
|
|
|
|
- LOGGER.warn( () -> "trimmed " + headTrimFinal + " from head position against database " + db );
|
|
|
|
|
|
+ examinedRecords.incrementAndGet();
|
|
|
|
+ conditionalTaskExecutor.conditionallyExecuteTask();
|
|
|
|
+ tailPosition = tailPosition.next();
|
|
|
|
+ tailTrim++;
|
|
}
|
|
}
|
|
|
|
+ localDB.put( db, KEY_TAIL_POSITION, tailPosition.key() );
|
|
|
|
|
|
- if ( tailTrim > 0 )
|
|
|
|
|
|
+ if ( tailTrim == 0 && headTrim == 0 )
|
|
{
|
|
{
|
|
- final int tailTrimFinal = tailTrim;
|
|
|
|
- LOGGER.warn( () -> "trimmed " + tailTrimFinal + " from tail position against database " + db );
|
|
|
|
|
|
+ LOGGER.trace( () -> "repair unnecessary for " + db );
|
|
}
|
|
}
|
|
- }
|
|
|
|
|
|
+ else
|
|
|
|
+ {
|
|
|
|
+ if ( headTrim > 0 )
|
|
|
|
+ {
|
|
|
|
+ final int headTrimFinal = headTrim;
|
|
|
|
+ LOGGER.warn( () -> "trimmed " + headTrimFinal + " from head position against database " + db );
|
|
|
|
+ }
|
|
|
|
|
|
|
|
+ if ( tailTrim > 0 )
|
|
|
|
+ {
|
|
|
|
+ final int tailTrimFinal = tailTrim;
|
|
|
|
+ LOGGER.warn( () -> "trimmed " + tailTrimFinal + " from tail position against database " + db );
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ finally
|
|
|
|
+ {
|
|
|
|
+ lock.writeLock().unlock();
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|