|
@@ -22,8 +22,7 @@ package password.pwm.util.localdb;
|
|
|
|
|
|
import password.pwm.PwmApplication;
|
|
|
import password.pwm.util.java.CollectionUtil;
|
|
|
-import password.pwm.util.java.ConditionalTaskExecutor;
|
|
|
-import password.pwm.util.java.TimeDuration;
|
|
|
+import password.pwm.util.java.StringUtil;
|
|
|
import password.pwm.util.logging.PwmLogger;
|
|
|
|
|
|
import java.util.ArrayList;
|
|
@@ -39,13 +38,12 @@ import java.util.Objects;
|
|
|
import java.util.Optional;
|
|
|
import java.util.Queue;
|
|
|
import java.util.Set;
|
|
|
-import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
import java.util.concurrent.locks.Lock;
|
|
|
import java.util.concurrent.locks.ReadWriteLock;
|
|
|
-import java.util.concurrent.locks.ReentrantLock;
|
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
|
+import java.util.function.Function;
|
|
|
import java.util.function.Supplier;
|
|
|
|
|
|
/**
|
|
@@ -64,6 +62,12 @@ public class LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
|
|
|
|
private final InternalQueue internalQueue;
|
|
|
|
|
|
+ private enum Direction
|
|
|
+ {
|
|
|
+ FORWARD,
|
|
|
+ REVERSE,
|
|
|
+ }
|
|
|
+
|
|
|
private LocalDBStoredQueue(
|
|
|
final LocalDB localDB,
|
|
|
final LocalDB.DB db,
|
|
@@ -110,7 +114,7 @@ public class LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
- internalQueue.removeLast( removalCount, false );
|
|
|
+ internalQueue.remove( removalCount, false, Direction.REVERSE );
|
|
|
}
|
|
|
catch ( final LocalDBException e )
|
|
|
{
|
|
@@ -122,7 +126,7 @@ public class LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
- internalQueue.removeFirst( removalCount, false );
|
|
|
+ internalQueue.remove( removalCount, false, Direction.FORWARD );
|
|
|
}
|
|
|
catch ( final LocalDBException e )
|
|
|
{
|
|
@@ -186,15 +190,12 @@ public class LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
- final Collection<String> stringCollection = new ArrayList<>();
|
|
|
- for ( final Object loopObj : c )
|
|
|
- {
|
|
|
- if ( loopObj != null )
|
|
|
- {
|
|
|
- stringCollection.add( loopObj.toString() );
|
|
|
- }
|
|
|
- }
|
|
|
- internalQueue.addFirst( stringCollection );
|
|
|
+ final List<String> list = ( c == null ? List.of() : c ).stream()
|
|
|
+ .filter( Objects::nonNull )
|
|
|
+ .map( String::valueOf )
|
|
|
+ .toList();
|
|
|
+
|
|
|
+ internalQueue.add( list, Direction.FORWARD );
|
|
|
return true;
|
|
|
}
|
|
|
catch ( final LocalDBException e )
|
|
@@ -214,7 +215,7 @@ public class LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
- internalQueue.addFirst( Collections.singletonList( s ) );
|
|
|
+ internalQueue.add( Collections.singletonList( s ), Direction.FORWARD );
|
|
|
return true;
|
|
|
}
|
|
|
catch ( final LocalDBException e )
|
|
@@ -273,7 +274,7 @@ public class LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
- internalQueue.addFirst( Collections.singletonList( s ) );
|
|
|
+ internalQueue.add( Collections.singletonList( s ), Direction.FORWARD );
|
|
|
}
|
|
|
catch ( final LocalDBException e )
|
|
|
{
|
|
@@ -286,7 +287,7 @@ public class LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
- internalQueue.addLast( Collections.singletonList( s ) );
|
|
|
+ internalQueue.add( Collections.singletonList( s ), Direction.REVERSE );
|
|
|
}
|
|
|
catch ( final LocalDBException e )
|
|
|
{
|
|
@@ -299,7 +300,7 @@ public class LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
- internalQueue.addFirst( Collections.singletonList( s ) );
|
|
|
+ internalQueue.add( Collections.singletonList( s ), Direction.FORWARD );
|
|
|
return true;
|
|
|
}
|
|
|
catch ( final LocalDBException e )
|
|
@@ -313,7 +314,7 @@ public class LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
- internalQueue.addLast( Collections.singletonList( s ) );
|
|
|
+ internalQueue.add( Collections.singletonList( s ), Direction.REVERSE );
|
|
|
return true;
|
|
|
}
|
|
|
catch ( final LocalDBException e )
|
|
@@ -349,7 +350,7 @@ public class LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
- final List<String> values = internalQueue.removeFirst( 1, true );
|
|
|
+ final List<String> values = internalQueue.remove( 1, true, Direction.FORWARD );
|
|
|
if ( values == null || values.isEmpty() )
|
|
|
{
|
|
|
return null;
|
|
@@ -367,7 +368,7 @@ public class LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
- final List<String> values = internalQueue.removeLast( 1, true );
|
|
|
+ final List<String> values = internalQueue.remove( 1, true, Direction.REVERSE );
|
|
|
if ( values == null || values.isEmpty() )
|
|
|
{
|
|
|
return null;
|
|
@@ -407,12 +408,7 @@ public class LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
- final List<String> values = internalQueue.getFirst( 1 );
|
|
|
- if ( CollectionUtil.isEmpty( values ) )
|
|
|
- {
|
|
|
- return null;
|
|
|
- }
|
|
|
- return values.get( 0 );
|
|
|
+ return internalQueue.peek( Direction.FORWARD ).orElse( null );
|
|
|
}
|
|
|
catch ( final LocalDBException e )
|
|
|
{
|
|
@@ -425,12 +421,7 @@ public class LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
- final List<String> values = internalQueue.getLast( 1 );
|
|
|
- if ( values == null || values.isEmpty() )
|
|
|
- {
|
|
|
- return null;
|
|
|
- }
|
|
|
- return values.get( 0 );
|
|
|
+ return internalQueue.peek( Direction.REVERSE ).orElse( null );
|
|
|
}
|
|
|
catch ( final LocalDBException e )
|
|
|
{
|
|
@@ -472,7 +463,7 @@ public class LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
- return new InnerIterator( internalQueue, false );
|
|
|
+ return new InnerIterator( internalQueue, Direction.REVERSE );
|
|
|
}
|
|
|
catch ( final LocalDBException e )
|
|
|
{
|
|
@@ -485,7 +476,7 @@ public class LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
- return new InnerIterator( internalQueue, true );
|
|
|
+ return new InnerIterator( internalQueue, Direction.FORWARD );
|
|
|
}
|
|
|
catch ( final LocalDBException e )
|
|
|
{
|
|
@@ -538,26 +529,39 @@ public class LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
|
|
|
|
private static class InnerIterator implements Iterator<String>
|
|
|
{
|
|
|
- private final AtomicReference<Position> position = new AtomicReference<>();
|
|
|
+ private final AtomicReference<Position> iteratorPosition = new AtomicReference<>();
|
|
|
private final InternalQueue internalQueue;
|
|
|
- private final boolean headFirst;
|
|
|
+ private final Direction direction;
|
|
|
+
|
|
|
+ //private final Position initialPosition;
|
|
|
|
|
|
- private final Position initialPosition;
|
|
|
+ /**
|
|
|
+ * Safety counter to make sure this iterator does not seek infinitely. It's possible concurrent modifications
|
|
|
+ * to the internal queue might cause the end position equality checks to miss.
|
|
|
+ */
|
|
|
private final AtomicLong itemsRemaining = new AtomicLong();
|
|
|
- private final Lock lock = new ReentrantLock();
|
|
|
+ private final Lock lock;
|
|
|
|
|
|
- private InnerIterator( final InternalQueue internalQueue, final boolean headFirst )
|
|
|
+ private InnerIterator( final InternalQueue internalQueue, final Direction direction )
|
|
|
throws LocalDBException
|
|
|
{
|
|
|
+ this.lock = internalQueue.lock.readLock();
|
|
|
this.internalQueue = internalQueue;
|
|
|
- this.headFirst = headFirst;
|
|
|
- this.initialPosition = internalQueue.size() == 0
|
|
|
- ? null
|
|
|
- : headFirst
|
|
|
- ? internalQueue.headPosition
|
|
|
- : internalQueue.tailPosition;
|
|
|
- position.set( initialPosition );
|
|
|
- itemsRemaining.set( internalQueue.size() );
|
|
|
+ this.direction = direction;
|
|
|
+
|
|
|
+ lock.lock();
|
|
|
+ try
|
|
|
+ {
|
|
|
+ final long currentSize = internalQueue.internalSize();
|
|
|
+ this.itemsRemaining.set( currentSize );
|
|
|
+ iteratorPosition.set( currentSize == 0
|
|
|
+ ? null
|
|
|
+ : internalQueue.currentPositionForDirection( direction ) );
|
|
|
+ }
|
|
|
+ finally
|
|
|
+ {
|
|
|
+ lock.unlock();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -566,7 +570,7 @@ public class LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
|
lock.lock();
|
|
|
try
|
|
|
{
|
|
|
- return position.get() != null;
|
|
|
+ return iteratorPosition.get() != null;
|
|
|
}
|
|
|
finally
|
|
|
{
|
|
@@ -590,26 +594,28 @@ public class LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
|
|
|
|
private String nextImpl()
|
|
|
{
|
|
|
- if ( position.get() == null )
|
|
|
+ if ( iteratorPosition.get() == null )
|
|
|
{
|
|
|
throw new NoSuchElementException();
|
|
|
}
|
|
|
- itemsRemaining.decrementAndGet();
|
|
|
+
|
|
|
try
|
|
|
{
|
|
|
- final String nextValue = internalQueue.localDB.get( internalQueue.db, position.get().key() ).orElseThrow();
|
|
|
- if ( headFirst )
|
|
|
- {
|
|
|
- position.updateAndGet( position -> Objects.equals( position, internalQueue.tailPosition ) ? null : position.previous() );
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- position.updateAndGet( position -> Objects.equals( position, internalQueue.headPosition ) ? null : position.next() );
|
|
|
- }
|
|
|
- if ( itemsRemaining.get() <= 0 || Objects.equals( position.get(), initialPosition ) )
|
|
|
+ final String nextValue = internalQueue.localDB.get( internalQueue.db, iteratorPosition.get().key() ).orElseThrow();
|
|
|
+
|
|
|
+ iteratorPosition.updateAndGet( position -> switch ( direction )
|
|
|
+ {
|
|
|
+ case FORWARD -> Objects.equals( position, internalQueue.tailPosition.get() ) ? null : position.previous();
|
|
|
+ case REVERSE -> Objects.equals( position, internalQueue.headPosition.get() ) ? null : position.next();
|
|
|
+ } );
|
|
|
+
|
|
|
+ itemsRemaining.decrementAndGet();
|
|
|
+
|
|
|
+ if ( itemsRemaining.get() < 1 )
|
|
|
{
|
|
|
- position.set( null );
|
|
|
+ iteratorPosition.set( null );
|
|
|
}
|
|
|
+
|
|
|
return nextValue;
|
|
|
}
|
|
|
catch ( final LocalDBException e )
|
|
@@ -625,27 +631,40 @@ public class LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- static class Position
|
|
|
+ record Position(
|
|
|
+ long position
|
|
|
+ )
|
|
|
{
|
|
|
private static final int RADIX = 36;
|
|
|
private static final long MAXIMUM_POSITION = Long.parseLong( "zzzzzz", RADIX );
|
|
|
private static final long MINIMUM_POSITION = 0;
|
|
|
+ private static final Position ZERO = new Position( 0 );
|
|
|
|
|
|
- private final long bigInt;
|
|
|
+ Position
|
|
|
+ {
|
|
|
+ if ( position > MAXIMUM_POSITION )
|
|
|
+ {
|
|
|
+ throw new IllegalStateException();
|
|
|
+ }
|
|
|
+ if ( position < MINIMUM_POSITION )
|
|
|
+ {
|
|
|
+ throw new IllegalStateException();
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- private Position( final long bigInt )
|
|
|
+ static Position fromKey( final String position )
|
|
|
{
|
|
|
- this.bigInt = bigInt;
|
|
|
+ return new Position( Long.parseLong( position, RADIX ) );
|
|
|
}
|
|
|
|
|
|
- Position( final String bigInt )
|
|
|
+ public static Position zero()
|
|
|
{
|
|
|
- this.bigInt = Long.parseLong( bigInt, RADIX );
|
|
|
+ return ZERO;
|
|
|
}
|
|
|
|
|
|
public Position next( )
|
|
|
{
|
|
|
- long next = bigInt + 1;
|
|
|
+ long next = position + 1;
|
|
|
if ( next > MAXIMUM_POSITION )
|
|
|
{
|
|
|
next = MINIMUM_POSITION;
|
|
@@ -655,7 +674,7 @@ public class LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
|
|
|
|
public Position previous( )
|
|
|
{
|
|
|
- long previous = bigInt - 1;
|
|
|
+ long previous = position - 1;
|
|
|
if ( previous < MINIMUM_POSITION )
|
|
|
{
|
|
|
previous = MAXIMUM_POSITION;
|
|
@@ -665,18 +684,18 @@ public class LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
|
|
|
|
public long distanceToHead( final Position head )
|
|
|
{
|
|
|
- final int compareToValue = Long.compare( head.bigInt, this.bigInt );
|
|
|
+ final int compareToValue = Long.compare( head.position, this.position );
|
|
|
if ( compareToValue == 0 )
|
|
|
{
|
|
|
return 0;
|
|
|
}
|
|
|
else if ( compareToValue == 1 )
|
|
|
{
|
|
|
- return head.bigInt - this.bigInt;
|
|
|
+ return head.position - this.position;
|
|
|
}
|
|
|
|
|
|
- final long tailToMax = MAXIMUM_POSITION - this.bigInt;
|
|
|
- final long minToHead = head.bigInt - MINIMUM_POSITION;
|
|
|
+ final long tailToMax = MAXIMUM_POSITION - this.position;
|
|
|
+ final long minToHead = head.position - MINIMUM_POSITION;
|
|
|
return minToHead + tailToMax + 1;
|
|
|
}
|
|
|
|
|
@@ -687,34 +706,7 @@ public class LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
|
|
|
|
public String key()
|
|
|
{
|
|
|
- final StringBuilder sb = new StringBuilder();
|
|
|
- sb.append( Long.toString( bigInt, RADIX ).toUpperCase() );
|
|
|
- while ( sb.length() < 6 )
|
|
|
- {
|
|
|
- sb.insert( 0, '0' );
|
|
|
- }
|
|
|
- return sb.toString();
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public boolean equals( final Object o )
|
|
|
- {
|
|
|
- if ( this == o )
|
|
|
- {
|
|
|
- return true;
|
|
|
- }
|
|
|
- if ( o == null || getClass() != o.getClass() )
|
|
|
- {
|
|
|
- return false;
|
|
|
- }
|
|
|
- final Position position = ( Position ) o;
|
|
|
- return bigInt == position.bigInt;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public int hashCode()
|
|
|
- {
|
|
|
- return Objects.hash( bigInt );
|
|
|
+ return StringUtil.padLeft( Long.toString( position, RADIX ).toUpperCase(), 6, '0' );
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -722,8 +714,8 @@ public class LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
|
{
|
|
|
private final LocalDB localDB;
|
|
|
private final LocalDB.DB db;
|
|
|
- private volatile Position headPosition;
|
|
|
- private volatile Position tailPosition;
|
|
|
+ private final AtomicReference<Position> headPosition = new AtomicReference<>();
|
|
|
+ private final AtomicReference<Position> tailPosition = new AtomicReference<>();
|
|
|
private boolean developerDebug = false;
|
|
|
private static final int DEBUG_MAX_ROWS = 50;
|
|
|
private static final int DEBUG_MAX_WIDTH = 120;
|
|
@@ -771,15 +763,13 @@ public class LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
|
clear();
|
|
|
}
|
|
|
|
|
|
- final Optional<String> headPositionStr = localDB.get( db, KEY_HEAD_POSITION );
|
|
|
- final Optional<String> tailPositionStr = localDB.get( db, KEY_TAIL_POSITION );
|
|
|
-
|
|
|
- headPosition = headPositionStr.isPresent() && headPositionStr.get().length() > 0 ? new Position( headPositionStr.get() ) : new Position( "0" );
|
|
|
- tailPosition = tailPositionStr.isPresent() && tailPositionStr.get().length() > 0 ? new Position( tailPositionStr.get() ) : new Position( "0" );
|
|
|
+ headPosition.set( initPosition( KEY_HEAD_POSITION ) );
|
|
|
+ tailPosition.set( initPosition( KEY_TAIL_POSITION ) );
|
|
|
|
|
|
{
|
|
|
final long finalSize = this.size();
|
|
|
- LOGGER.trace( () -> "loaded for db " + db + "; headPosition=" + headPosition + ", tailPosition=" + tailPosition + ", size=" + finalSize );
|
|
|
+ LOGGER.trace( () -> "loaded for db " + db + "; headPosition=" + headPosition.get()
|
|
|
+ + ", tailPosition=" + tailPosition.get() + ", size=" + finalSize );
|
|
|
}
|
|
|
|
|
|
repair();
|
|
@@ -787,6 +777,13 @@ public class LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
|
debugOutput( "post init()" );
|
|
|
}
|
|
|
|
|
|
+ private Position initPosition( final String dbKey )
|
|
|
+ throws LocalDBException
|
|
|
+ {
|
|
|
+ final Optional<String> positionStr = localDB.get( db, dbKey );
|
|
|
+ return positionStr.map( Position::fromKey ).orElse( Position.zero() );
|
|
|
+ }
|
|
|
+
|
|
|
private boolean checkVersion( ) throws LocalDBException
|
|
|
{
|
|
|
final Optional<String> storedVersion = localDB.get( db, KEY_VERSION );
|
|
@@ -806,14 +803,14 @@ public class LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
|
{
|
|
|
localDB.truncate( db );
|
|
|
|
|
|
- headPosition = new Position( "0" );
|
|
|
- tailPosition = new Position( "0" );
|
|
|
- final Map<String, String> keyValueMap = new HashMap<>();
|
|
|
- keyValueMap.put( KEY_HEAD_POSITION, headPosition.toString() );
|
|
|
- keyValueMap.put( KEY_TAIL_POSITION, tailPosition.toString() );
|
|
|
- keyValueMap.put( KEY_VERSION, VALUE_VERSION );
|
|
|
+ headPosition.set( Position.zero() );
|
|
|
+ tailPosition.set( Position.zero() );
|
|
|
+
|
|
|
+ localDB.putAll( db, Map.of(
|
|
|
+ KEY_HEAD_POSITION, headPosition.get().toString(),
|
|
|
+ KEY_TAIL_POSITION, tailPosition.get().toString(),
|
|
|
+ KEY_VERSION, VALUE_VERSION ) );
|
|
|
|
|
|
- localDB.putAll( db, keyValueMap );
|
|
|
debugOutput( "post clear()" );
|
|
|
}
|
|
|
finally
|
|
@@ -839,101 +836,62 @@ public class LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
|
private long internalSize( )
|
|
|
throws LocalDBException
|
|
|
{
|
|
|
- if ( headPosition.equals( tailPosition ) && localDB.get( db, headPosition.toString() ).isEmpty() )
|
|
|
+ if ( Objects.equals( headPosition.get(), tailPosition.get() ) && localDB.get( db, headPosition.get().toString() ).isEmpty() )
|
|
|
{
|
|
|
return 0;
|
|
|
}
|
|
|
- return tailPosition.distanceToHead( headPosition ) + 1;
|
|
|
+ return tailPosition.get().distanceToHead( headPosition.get() ) + 1;
|
|
|
}
|
|
|
|
|
|
- List<String> removeFirst( final int removalCount, final boolean returnValues ) throws LocalDBException
|
|
|
+ private List<String> remove( final int removalCount, final boolean returnValues, final Direction direction )
|
|
|
+ throws LocalDBException
|
|
|
{
|
|
|
lock.writeLock().lock();
|
|
|
try
|
|
|
{
|
|
|
- debugOutput( "pre removeFirst()" );
|
|
|
- final List<String> removedValues = removeImpl( removalCount, returnValues, true );
|
|
|
- debugOutput( "post removeFirst()" );
|
|
|
- return removedValues;
|
|
|
- }
|
|
|
- finally
|
|
|
- {
|
|
|
- lock.writeLock().unlock();
|
|
|
- }
|
|
|
- }
|
|
|
+ debugOutput( "pre remove() " + direction );
|
|
|
|
|
|
- List<String> removeLast( final int removalCount, final boolean returnValues ) throws LocalDBException
|
|
|
- {
|
|
|
- lock.writeLock().lock();
|
|
|
- try
|
|
|
- {
|
|
|
- debugOutput( "pre removeLast()" );
|
|
|
- final List<String> removedValues = removeImpl( removalCount, returnValues, false );
|
|
|
- debugOutput( "post removeLast()" );
|
|
|
- return removedValues;
|
|
|
- }
|
|
|
- finally
|
|
|
- {
|
|
|
- lock.writeLock().unlock();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private List<String> removeImpl( final int removalCount, final boolean returnValues, final boolean forward )
|
|
|
- throws LocalDBException
|
|
|
- {
|
|
|
- if ( removalCount < 1 )
|
|
|
- {
|
|
|
- return Collections.emptyList();
|
|
|
- }
|
|
|
-
|
|
|
- final List<String> removalKeys = new ArrayList<>( removalCount );
|
|
|
- final List<String> removedValues = new ArrayList<>( removalCount );
|
|
|
- Position loopPosition = forward ? headPosition : tailPosition;
|
|
|
- int removedPositions = 0;
|
|
|
- while ( removedPositions < removalCount )
|
|
|
- {
|
|
|
- removalKeys.add( loopPosition.key() );
|
|
|
- if ( returnValues )
|
|
|
+ if ( removalCount < 1 )
|
|
|
{
|
|
|
- final Optional<String> loopValue = localDB.get( db, loopPosition.key() );
|
|
|
- loopValue.ifPresent( removedValues::add );
|
|
|
+ return Collections.emptyList();
|
|
|
}
|
|
|
|
|
|
- if ( forward )
|
|
|
- {
|
|
|
- loopPosition = loopPosition.equals( tailPosition ) ? loopPosition : loopPosition.previous();
|
|
|
- }
|
|
|
- else
|
|
|
+ final List<String> removalKeys = new ArrayList<>( removalCount );
|
|
|
+ final List<String> removedValues = new ArrayList<>( removalCount );
|
|
|
+
|
|
|
+ Position loopPosition = currentPositionForDirection( direction );
|
|
|
+
|
|
|
+ int removedPositions = 0;
|
|
|
+ while ( removedPositions < removalCount )
|
|
|
{
|
|
|
- loopPosition = loopPosition.equals( headPosition ) ? loopPosition : loopPosition.next();
|
|
|
- }
|
|
|
+ removalKeys.add( loopPosition.key() );
|
|
|
+ if ( returnValues )
|
|
|
+ {
|
|
|
+ final Optional<String> loopValue = localDB.get( db, loopPosition.key() );
|
|
|
+ loopValue.ifPresent( removedValues::add );
|
|
|
+ }
|
|
|
|
|
|
- removedPositions++;
|
|
|
- }
|
|
|
- localDB.removeAll( db, removalKeys );
|
|
|
- localDB.put( db, forward ? KEY_HEAD_POSITION : KEY_TAIL_POSITION, loopPosition.key() );
|
|
|
+ loopPosition = switch ( direction )
|
|
|
+ {
|
|
|
+ case FORWARD -> loopPosition.equals( tailPosition.get() ) ? loopPosition : loopPosition.previous();
|
|
|
+ case REVERSE -> loopPosition.equals( headPosition.get() ) ? loopPosition : loopPosition.next();
|
|
|
+ };
|
|
|
|
|
|
- if ( forward )
|
|
|
- {
|
|
|
- headPosition = loopPosition;
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- tailPosition = loopPosition;
|
|
|
- }
|
|
|
+ removedPositions++;
|
|
|
+ }
|
|
|
|
|
|
- return Collections.unmodifiableList( removedValues );
|
|
|
- }
|
|
|
+ localDB.removeAll( db, removalKeys );
|
|
|
+ localDB.put( db, direction == Direction.FORWARD ? KEY_HEAD_POSITION : KEY_TAIL_POSITION, loopPosition.key() );
|
|
|
|
|
|
- void addFirst( final Collection<String> values )
|
|
|
- throws LocalDBException
|
|
|
- {
|
|
|
- lock.writeLock().lock();
|
|
|
- try
|
|
|
- {
|
|
|
- debugOutput( "pre addFirst()" );
|
|
|
- addImpl( values, true );
|
|
|
- debugOutput( "post addFirst()" );
|
|
|
+ switch ( direction )
|
|
|
+ {
|
|
|
+ case FORWARD -> headPosition.set( loopPosition );
|
|
|
+ case REVERSE -> tailPosition.set( loopPosition );
|
|
|
+ default -> throw new IllegalStateException();
|
|
|
+ }
|
|
|
+
|
|
|
+ debugOutput( "post remove() " + direction );
|
|
|
+ return List.copyOf( removedValues );
|
|
|
}
|
|
|
finally
|
|
|
{
|
|
@@ -948,13 +906,8 @@ public class LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
|
try
|
|
|
{
|
|
|
debugOutput( "pre toArray()" );
|
|
|
- final int size = Math.toIntExact( internalSize() );
|
|
|
- final String[] stringArray = new String[ size ];
|
|
|
- final InnerIterator iterator = new InnerIterator( this, true );
|
|
|
- for ( int i = 0; i < size; i++ )
|
|
|
- {
|
|
|
- stringArray[i] = iterator.next();
|
|
|
- }
|
|
|
+ final String[] stringArray = CollectionUtil.iteratorToStream( new InnerIterator( this, Direction.FORWARD ) )
|
|
|
+ .toArray( String[]::new );
|
|
|
debugOutput( "post toArray()" );
|
|
|
return stringArray;
|
|
|
}
|
|
@@ -964,123 +917,87 @@ public class LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- void addLast( final Collection<String> values ) throws LocalDBException
|
|
|
+ private Position currentPositionForDirection( final Direction direction )
|
|
|
{
|
|
|
- lock.writeLock().lock();
|
|
|
- try
|
|
|
- {
|
|
|
- debugOutput( "pre addLast()" );
|
|
|
- addImpl( values, false );
|
|
|
- debugOutput( "post addLast()" );
|
|
|
- }
|
|
|
- finally
|
|
|
- {
|
|
|
- lock.writeLock().unlock();
|
|
|
- }
|
|
|
+ return direction == Direction.FORWARD ? headPosition.get() : tailPosition.get();
|
|
|
}
|
|
|
|
|
|
- private void addImpl( final Collection<String> values, final boolean forward )
|
|
|
+ private void add( final Collection<String> values, final Direction direction )
|
|
|
throws LocalDBException
|
|
|
{
|
|
|
- if ( CollectionUtil.isEmpty( values ) )
|
|
|
- {
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- if ( internalSize() + values.size() > MAX_SIZE )
|
|
|
+ lock.writeLock().lock();
|
|
|
+ try
|
|
|
{
|
|
|
- throw new IllegalStateException( "queue overflow" );
|
|
|
- }
|
|
|
-
|
|
|
- final Iterator<String> valueIterator = values.iterator();
|
|
|
-
|
|
|
- final Map<String, String> keyValueMap = new HashMap<>( values.size() );
|
|
|
- Position loopPosition = forward ? headPosition : tailPosition;
|
|
|
+ debugOutput( "pre add() " + direction );
|
|
|
+ if ( CollectionUtil.isEmpty( values ) )
|
|
|
+ {
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
- if ( internalSize() == 0 )
|
|
|
- {
|
|
|
- keyValueMap.put( loopPosition.toString(), valueIterator.next() );
|
|
|
- }
|
|
|
+ if ( internalSize() + values.size() > MAX_SIZE )
|
|
|
+ {
|
|
|
+ throw new IllegalStateException( "queue overflow" );
|
|
|
+ }
|
|
|
|
|
|
- while ( valueIterator.hasNext() )
|
|
|
- {
|
|
|
- loopPosition = forward ? loopPosition.next() : loopPosition.previous();
|
|
|
- keyValueMap.put( loopPosition.key(), valueIterator.next() );
|
|
|
- }
|
|
|
+ final Iterator<String> valueIterator = values.iterator();
|
|
|
|
|
|
- keyValueMap.put( forward ? KEY_HEAD_POSITION : KEY_TAIL_POSITION, loopPosition.key() );
|
|
|
- localDB.putAll( db, keyValueMap );
|
|
|
+ final Map<String, String> keyValueMap = new HashMap<>( values.size() );
|
|
|
+ Position loopPosition = currentPositionForDirection( direction );
|
|
|
|
|
|
- if ( forward )
|
|
|
- {
|
|
|
- headPosition = loopPosition;
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- tailPosition = loopPosition;
|
|
|
- }
|
|
|
- }
|
|
|
+ if ( internalSize() == 0 )
|
|
|
+ {
|
|
|
+ keyValueMap.put( loopPosition.toString(), valueIterator.next() );
|
|
|
+ }
|
|
|
|
|
|
+ while ( valueIterator.hasNext() )
|
|
|
+ {
|
|
|
+ loopPosition = direction == Direction.FORWARD ? loopPosition.next() : loopPosition.previous();
|
|
|
+ keyValueMap.put( loopPosition.key(), valueIterator.next() );
|
|
|
+ }
|
|
|
|
|
|
+ keyValueMap.put( direction == Direction.FORWARD ? KEY_HEAD_POSITION : KEY_TAIL_POSITION, loopPosition.key() );
|
|
|
+ localDB.putAll( db, keyValueMap );
|
|
|
|
|
|
- List<String> getFirst( final int count )
|
|
|
- throws LocalDBException
|
|
|
- {
|
|
|
- lock.readLock().lock();
|
|
|
- try
|
|
|
- {
|
|
|
- debugOutput( "pre getFirst()" );
|
|
|
- final List<String> returnList = getImpl( count, true );
|
|
|
- debugOutput( "post getFirst()" );
|
|
|
- return returnList;
|
|
|
+ if ( direction == Direction.FORWARD )
|
|
|
+ {
|
|
|
+ headPosition.set( loopPosition );
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ tailPosition.set( loopPosition );
|
|
|
+ }
|
|
|
+ debugOutput( "post add() " + direction );
|
|
|
}
|
|
|
finally
|
|
|
{
|
|
|
- lock.readLock().unlock();
|
|
|
+ lock.writeLock().unlock();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- List<String> getLast( final int count )
|
|
|
+ private Optional<String> peek( final Direction direction )
|
|
|
throws LocalDBException
|
|
|
{
|
|
|
lock.readLock().lock();
|
|
|
try
|
|
|
{
|
|
|
- debugOutput( "pre getLast()" );
|
|
|
- final List<String> returnList = getImpl( count, false );
|
|
|
- debugOutput( "post getLast()" );
|
|
|
- return returnList;
|
|
|
- }
|
|
|
- finally
|
|
|
- {
|
|
|
- lock.readLock().unlock();
|
|
|
- }
|
|
|
- }
|
|
|
+ debugOutput( "pre get() " + direction );
|
|
|
|
|
|
- private List<String> getImpl( final long count, final boolean forward )
|
|
|
- throws LocalDBException
|
|
|
- {
|
|
|
- long getCount = count;
|
|
|
- if ( getCount < 1 )
|
|
|
- {
|
|
|
- return Collections.emptyList();
|
|
|
- }
|
|
|
+ if ( internalSize() <= 1 )
|
|
|
+ {
|
|
|
+ return Optional.empty();
|
|
|
+ }
|
|
|
|
|
|
- if ( getCount > internalSize() )
|
|
|
- {
|
|
|
- getCount = internalSize();
|
|
|
- }
|
|
|
+ final Position nextPosition = currentPositionForDirection( direction );
|
|
|
+ debugOutput( "post get() " + direction );
|
|
|
|
|
|
- final List<String> returnList = new ArrayList<>();
|
|
|
+ return localDB.get( db, nextPosition.key() );
|
|
|
|
|
|
- Position nextPosition = forward ? headPosition : tailPosition;
|
|
|
- while ( returnList.size() < getCount )
|
|
|
+ }
|
|
|
+ finally
|
|
|
{
|
|
|
- returnList.add( localDB.get( db, nextPosition.key() ).orElseThrow() );
|
|
|
- nextPosition = forward ? nextPosition.previous() : nextPosition.next();
|
|
|
+ lock.readLock().unlock();
|
|
|
}
|
|
|
|
|
|
- return Collections.unmodifiableList( returnList );
|
|
|
}
|
|
|
|
|
|
void debugOutput( final String input )
|
|
@@ -1096,10 +1013,11 @@ public class LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
|
try
|
|
|
{
|
|
|
sb.append( input );
|
|
|
- sb.append( " tailPosition=" ).append( tailPosition ).append( ", headPosition=" ).append( headPosition ).append( ", db=" ).append( db );
|
|
|
+ sb.append( " tailPosition=" ).append( tailPosition.get() ).append( ", headPosition=" )
|
|
|
+ .append( headPosition.get() ).append( ", db=" ).append( db );
|
|
|
sb.append( ", size=" ).append( internalSize() ).append( '\n' );
|
|
|
|
|
|
- try ( LocalDB.LocalDBIterator<Map.Entry<String, String>> localDBIterator = localDB.iterator( db ) )
|
|
|
+ try ( LocalDB.LocalDBIterator localDBIterator = localDB.iterator( db ) )
|
|
|
{
|
|
|
int rowCount = 0;
|
|
|
while ( localDBIterator.hasNext() && rowCount < DEBUG_MAX_ROWS )
|
|
@@ -1126,82 +1044,79 @@ public class LocalDBStoredQueue implements Queue<String>, Deque<String>
|
|
|
LOGGER.trace( debugOutput );
|
|
|
}
|
|
|
|
|
|
- private void repair( ) throws LocalDBException
|
|
|
+ private void repair( )
|
|
|
+ throws LocalDBException
|
|
|
{
|
|
|
lock.writeLock().lock();
|
|
|
try
|
|
|
{
|
|
|
- int headTrim = 0;
|
|
|
- int tailTrim = 0;
|
|
|
-
|
|
|
debugOutput( "pre repair()" );
|
|
|
|
|
|
- final AtomicInteger examinedRecords = new AtomicInteger( 0 );
|
|
|
-
|
|
|
- final Runnable checkPointProcess = () ->
|
|
|
- {
|
|
|
- try
|
|
|
- {
|
|
|
- localDB.put( db, KEY_HEAD_POSITION, headPosition.key() );
|
|
|
- localDB.put( db, KEY_TAIL_POSITION, tailPosition.key() );
|
|
|
- final long dbSize = size();
|
|
|
- LOGGER.debug( () -> "repairing db " + db + ", " + examinedRecords.get() + " records examined"
|
|
|
- + ", size=" + dbSize
|
|
|
- + ", head=" + headPosition.key() + ", tail=" + tailPosition.key() );
|
|
|
- }
|
|
|
- catch ( final Exception e )
|
|
|
- {
|
|
|
- LOGGER.error( () -> "unexpected error during output of debug message during stored queue repair operation: "
|
|
|
- + e.getMessage(), e );
|
|
|
- }
|
|
|
- };
|
|
|
+ final int headTrimCounter = trimQueueEnd( headPosition, Position::previous );
|
|
|
+ final int tailTrimCounter = trimQueueEnd( tailPosition, Position::next );
|
|
|
|
|
|
- final ConditionalTaskExecutor conditionalTaskExecutor = ConditionalTaskExecutor.forPeriodicTask(
|
|
|
- checkPointProcess,
|
|
|
- TimeDuration.SECONDS_10.asDuration() );
|
|
|
+ outputRepairConclusion( tailTrimCounter, headTrimCounter );
|
|
|
+ }
|
|
|
+ finally
|
|
|
+ {
|
|
|
+ lock.writeLock().unlock();
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- // trim the top.
|
|
|
- while ( !headPosition.equals( tailPosition ) && localDB.get( db, headPosition.key() ).isPresent() )
|
|
|
- {
|
|
|
- examinedRecords.incrementAndGet();
|
|
|
- conditionalTaskExecutor.conditionallyExecuteTask();
|
|
|
- headPosition = headPosition.previous();
|
|
|
- headTrim++;
|
|
|
- }
|
|
|
- localDB.put( db, KEY_HEAD_POSITION, headPosition.key() );
|
|
|
+ private int trimQueueEnd( final AtomicReference<Position> examinePosition, final Function<Position, Position> calcNextPosition )
|
|
|
+ throws LocalDBException
|
|
|
+ {
|
|
|
+ long safetyCounter = internalSize();
|
|
|
+ Position loopPosition = examinePosition.get();
|
|
|
+ int counter = 0;
|
|
|
+ while ( safetyCounter >= 0 && !headPosition.get().equals( tailPosition.get() )
|
|
|
+ && localDB.get( db, loopPosition.key() ).isEmpty() )
|
|
|
+ {
|
|
|
+ counter++;
|
|
|
+ loopPosition = calcNextPosition.apply( loopPosition );
|
|
|
+ writeRepairState( counter );
|
|
|
+ safetyCounter--;
|
|
|
+ examinePosition.set( loopPosition );
|
|
|
+ }
|
|
|
+ return counter;
|
|
|
+ }
|
|
|
|
|
|
- // trim the bottom.
|
|
|
- while ( !headPosition.equals( tailPosition ) && localDB.get( db, tailPosition.toString() ).isPresent() )
|
|
|
+ private void outputRepairConclusion( final int tailTrimCounter, final int headTrimCounter )
|
|
|
+ {
|
|
|
+ if ( tailTrimCounter == 0 && headTrimCounter == 0 )
|
|
|
+ {
|
|
|
+ LOGGER.trace( () -> "repair unnecessary for " + db );
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ if ( headTrimCounter > 0 )
|
|
|
{
|
|
|
- examinedRecords.incrementAndGet();
|
|
|
- conditionalTaskExecutor.conditionallyExecuteTask();
|
|
|
- tailPosition = tailPosition.next();
|
|
|
- tailTrim++;
|
|
|
+ LOGGER.warn( () -> "trimmed " + headTrimCounter + " from head position against database " + db );
|
|
|
}
|
|
|
- localDB.put( db, KEY_TAIL_POSITION, tailPosition.key() );
|
|
|
|
|
|
- if ( tailTrim == 0 && headTrim == 0 )
|
|
|
+ if ( tailTrimCounter > 0 )
|
|
|
{
|
|
|
- LOGGER.trace( () -> "repair unnecessary for " + db );
|
|
|
+ LOGGER.warn( () -> "trimmed " + tailTrimCounter + " from tail position against database " + db );
|
|
|
}
|
|
|
- else
|
|
|
- {
|
|
|
- if ( headTrim > 0 )
|
|
|
- {
|
|
|
- final int headTrimFinal = headTrim;
|
|
|
- LOGGER.warn( () -> "trimmed " + headTrimFinal + " from head position against database " + db );
|
|
|
- }
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- if ( tailTrim > 0 )
|
|
|
- {
|
|
|
- final int tailTrimFinal = tailTrim;
|
|
|
- LOGGER.warn( () -> "trimmed " + tailTrimFinal + " from tail position against database " + db );
|
|
|
- }
|
|
|
- }
|
|
|
+ private void writeRepairState( final int examinedRecords )
|
|
|
+ {
|
|
|
+ try
|
|
|
+ {
|
|
|
+ localDB.putAll( db, Map.ofEntries(
|
|
|
+ Map.entry( KEY_HEAD_POSITION, headPosition.get().key() ),
|
|
|
+ Map.entry( KEY_TAIL_POSITION, tailPosition.get().key() ) ) );
|
|
|
+ final long dbSize = size();
|
|
|
+ LOGGER.debug( () -> "repairing db " + db + ", " + examinedRecords + " records examined"
|
|
|
+ + ", size=" + dbSize
|
|
|
+ + ", head=" + headPosition.get().key() + ", tail=" + tailPosition.get().key() );
|
|
|
}
|
|
|
- finally
|
|
|
+ catch ( final Exception e )
|
|
|
{
|
|
|
- lock.writeLock().unlock();
|
|
|
+ LOGGER.error( () -> "unexpected error during output of debug message during stored queue repair operation: "
|
|
|
+ + e.getMessage(), e );
|
|
|
}
|
|
|
}
|
|
|
}
|