|
@@ -27,11 +27,7 @@ import password.pwm.error.ErrorInformation;
|
|
|
import password.pwm.error.PwmError;
|
|
|
import password.pwm.error.PwmException;
|
|
|
import password.pwm.error.PwmUnrecoverableException;
|
|
|
-import password.pwm.util.db.DatabaseService;
|
|
|
-import password.pwm.util.db.DatabaseTable;
|
|
|
-import password.pwm.util.java.ClosableIterator;
|
|
|
import password.pwm.util.java.JavaHelper;
|
|
|
-import password.pwm.util.java.JsonUtil;
|
|
|
import password.pwm.util.java.TimeDuration;
|
|
|
import password.pwm.util.logging.PwmLogger;
|
|
|
|
|
@@ -45,38 +41,32 @@ import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
-class DatabaseClusterProvider implements ClusterProvider
|
|
|
+class ClusterMachine
|
|
|
{
|
|
|
-
|
|
|
- private static final PwmLogger LOGGER = PwmLogger.forClass( DatabaseClusterProvider.class );
|
|
|
-
|
|
|
- private static final DatabaseTable TABLE = DatabaseTable.CLUSTER_STATE;
|
|
|
-
|
|
|
-
|
|
|
- private static final String KEY_PREFIX_NODE = "node-";
|
|
|
+ private static final PwmLogger LOGGER = PwmLogger.forClass( ClusterMachine.class );
|
|
|
|
|
|
private final PwmApplication pwmApplication;
|
|
|
- private final DatabaseService databaseService;
|
|
|
private final ScheduledExecutorService executorService;
|
|
|
+ private final ClusterDataServiceProvider clusterDataServiceProvider;
|
|
|
|
|
|
private ErrorInformation lastError;
|
|
|
|
|
|
- private final Map<String, DatabaseStoredNodeData> nodeDatas = new ConcurrentHashMap<>();
|
|
|
+ private final Map<String, StoredNodeData> knownNodes = new ConcurrentHashMap<>();
|
|
|
|
|
|
- private final DatabaseClusterSettings settings;
|
|
|
+ private final ClusterSettings settings;
|
|
|
+ private final ClusterStatistics clusterStatistics = new ClusterStatistics();
|
|
|
|
|
|
- DatabaseClusterProvider( final PwmApplication pwmApplication ) throws PwmUnrecoverableException
|
|
|
+ ClusterMachine(
|
|
|
+ final PwmApplication pwmApplication,
|
|
|
+ final ClusterDataServiceProvider clusterDataServiceProvider,
|
|
|
+ final ClusterSettings clusterSettings
|
|
|
+ )
|
|
|
{
|
|
|
this.pwmApplication = pwmApplication;
|
|
|
- this.settings = DatabaseClusterSettings.fromConfig( pwmApplication.getConfig() );
|
|
|
-
|
|
|
- if ( !settings.isEnable() )
|
|
|
- {
|
|
|
- throw new PwmUnrecoverableException( new ErrorInformation( PwmError.CONFIG_FORMAT_ERROR, "database clustering is not enabled via app property" ) );
|
|
|
- }
|
|
|
+ this.clusterDataServiceProvider = clusterDataServiceProvider;
|
|
|
+ this.settings = clusterSettings;
|
|
|
|
|
|
- this.databaseService = pwmApplication.getDatabaseService();
|
|
|
- this.executorService = JavaHelper.makeSingleThreadExecutorService( pwmApplication, DatabaseClusterProvider.class );
|
|
|
+ this.executorService = JavaHelper.makeSingleThreadExecutorService( pwmApplication, ClusterMachine.class );
|
|
|
|
|
|
final long intervalSeconds = settings.getHeartbeatInterval().getTotalSeconds();
|
|
|
|
|
@@ -88,19 +78,17 @@ class DatabaseClusterProvider implements ClusterProvider
|
|
|
);
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
public void close( )
|
|
|
{
|
|
|
JavaHelper.closeAndWaitExecutor( executorService, new TimeDuration( 1, TimeUnit.SECONDS ) );
|
|
|
}
|
|
|
|
|
|
|
|
|
- @Override
|
|
|
public List<NodeInfo> nodes( ) throws PwmUnrecoverableException
|
|
|
{
|
|
|
final Map<String, NodeInfo> returnObj = new TreeMap<>();
|
|
|
final String configHash = pwmApplication.getConfig().configurationHash();
|
|
|
- for ( final DatabaseStoredNodeData storedNodeData : nodeDatas.values() )
|
|
|
+ for ( final StoredNodeData storedNodeData : knownNodes.values() )
|
|
|
{
|
|
|
final boolean configMatch = configHash.equals( storedNodeData.getConfigHash() );
|
|
|
final boolean timedOut = isTimedOut( storedNodeData );
|
|
@@ -132,7 +120,7 @@ class DatabaseClusterProvider implements ClusterProvider
|
|
|
|
|
|
private String masterInstanceId( )
|
|
|
{
|
|
|
- final List<DatabaseStoredNodeData> copiedDatas = new ArrayList<>( nodeDatas.values() );
|
|
|
+ final List<StoredNodeData> copiedDatas = new ArrayList<>( knownNodes.values() );
|
|
|
if ( copiedDatas.isEmpty() )
|
|
|
{
|
|
|
return null;
|
|
@@ -141,7 +129,7 @@ class DatabaseClusterProvider implements ClusterProvider
|
|
|
String masterID = null;
|
|
|
Instant eldestRecord = Instant.now();
|
|
|
|
|
|
- for ( final DatabaseStoredNodeData nodeData : copiedDatas )
|
|
|
+ for ( final StoredNodeData nodeData : copiedDatas )
|
|
|
{
|
|
|
if ( !isTimedOut( nodeData ) )
|
|
|
{
|
|
@@ -155,7 +143,6 @@ class DatabaseClusterProvider implements ClusterProvider
|
|
|
return masterID;
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
public boolean isMaster( )
|
|
|
{
|
|
|
final String myID = pwmApplication.getInstanceID();
|
|
@@ -163,27 +150,21 @@ class DatabaseClusterProvider implements ClusterProvider
|
|
|
return myID.equals( masterID );
|
|
|
}
|
|
|
|
|
|
- private boolean isMaster( final DatabaseStoredNodeData databaseStoredNodeData )
|
|
|
+ private boolean isMaster( final StoredNodeData storedNodeData )
|
|
|
{
|
|
|
final String masterID = masterInstanceId();
|
|
|
- return databaseStoredNodeData.getInstanceID().equals( masterID );
|
|
|
+ return storedNodeData.getInstanceID().equals( masterID );
|
|
|
}
|
|
|
|
|
|
- private String dbKeyForStoredNode( final DatabaseStoredNodeData storedNodeData ) throws PwmUnrecoverableException
|
|
|
+ private boolean isTimedOut( final StoredNodeData storedNodeData )
|
|
|
{
|
|
|
- final String instanceID = storedNodeData.getInstanceID();
|
|
|
- final String hash = pwmApplication.getSecureService().hash( instanceID );
|
|
|
- final String truncatedHash = hash.length() > 64
|
|
|
- ? hash.substring( 0, 64 )
|
|
|
- : hash;
|
|
|
-
|
|
|
- return KEY_PREFIX_NODE + truncatedHash;
|
|
|
+ final TimeDuration age = TimeDuration.fromCurrent( storedNodeData.getTimestamp() );
|
|
|
+ return age.isLongerThan( settings.getNodeTimeout() );
|
|
|
}
|
|
|
|
|
|
- private boolean isTimedOut( final DatabaseStoredNodeData storedNodeData )
|
|
|
+ public ErrorInformation getLastError( )
|
|
|
{
|
|
|
- final TimeDuration age = TimeDuration.fromCurrent( storedNodeData.getTimestamp() );
|
|
|
- return age.isLongerThan( settings.getNodeTimeout() );
|
|
|
+ return lastError;
|
|
|
}
|
|
|
|
|
|
private class HeartbeatProcess implements Runnable
|
|
@@ -199,15 +180,14 @@ class DatabaseClusterProvider implements ClusterProvider
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
- final DatabaseStoredNodeData storedNodeData = DatabaseStoredNodeData.makeNew( pwmApplication );
|
|
|
- final String key = dbKeyForStoredNode( storedNodeData );
|
|
|
- final String value = JsonUtil.serialize( storedNodeData );
|
|
|
- databaseService.getAccessor().put( TABLE, key, value );
|
|
|
+ final StoredNodeData storedNodeData = StoredNodeData.makeNew( pwmApplication );
|
|
|
+ clusterDataServiceProvider.writeNodeStatus( storedNodeData );
|
|
|
+ clusterStatistics.getClusterWrites().incrementAndGet();
|
|
|
}
|
|
|
catch ( PwmException e )
|
|
|
{
|
|
|
final String errorMsg = "error writing database cluster heartbeat: " + e.getMessage();
|
|
|
- final ErrorInformation errorInformation = new ErrorInformation( PwmError.ERROR_DB_UNAVAILABLE, errorMsg );
|
|
|
+ final ErrorInformation errorInformation = new ErrorInformation( PwmError.ERROR_CLUSTER_SERVICE_ERROR, errorMsg );
|
|
|
lastError = errorInformation;
|
|
|
LOGGER.error( lastError );
|
|
|
}
|
|
@@ -215,23 +195,16 @@ class DatabaseClusterProvider implements ClusterProvider
|
|
|
|
|
|
void readNodeStatuses( )
|
|
|
{
|
|
|
- try ( ClosableIterator<String> tableIterator = databaseService.getAccessor().iterator( TABLE ) )
|
|
|
+ try
|
|
|
{
|
|
|
- while ( tableIterator.hasNext() )
|
|
|
- {
|
|
|
- final String dbKey = tableIterator.next();
|
|
|
- if ( dbKey.startsWith( KEY_PREFIX_NODE ) )
|
|
|
- {
|
|
|
- final String rawValueInDb = databaseService.getAccessor().get( TABLE, dbKey );
|
|
|
- final DatabaseStoredNodeData nodeDataInDb = JsonUtil.deserialize( rawValueInDb, DatabaseStoredNodeData.class );
|
|
|
- nodeDatas.put( nodeDataInDb.getInstanceID(), nodeDataInDb );
|
|
|
- }
|
|
|
- }
|
|
|
+ final Map<String, StoredNodeData> readNodeData = clusterDataServiceProvider.readStoredData();
|
|
|
+ knownNodes.putAll( readNodeData );
|
|
|
+ clusterStatistics.getClusterReads().incrementAndGet();
|
|
|
}
|
|
|
catch ( PwmException e )
|
|
|
{
|
|
|
- final String errorMsg = "error reading database node statuses: " + e.getMessage();
|
|
|
- final ErrorInformation errorInformation = new ErrorInformation( PwmError.ERROR_DB_UNAVAILABLE, errorMsg );
|
|
|
+ final String errorMsg = "error reading node statuses: " + e.getMessage();
|
|
|
+ final ErrorInformation errorInformation = new ErrorInformation( PwmError.ERROR_CLUSTER_SERVICE_ERROR, errorMsg );
|
|
|
lastError = errorInformation;
|
|
|
LOGGER.error( lastError );
|
|
|
}
|
|
@@ -239,30 +212,23 @@ class DatabaseClusterProvider implements ClusterProvider
|
|
|
|
|
|
void purgeOutdatedNodes( )
|
|
|
{
|
|
|
- for ( final DatabaseStoredNodeData storedNodeData : nodeDatas.values() )
|
|
|
+ try
|
|
|
{
|
|
|
- final TimeDuration recordAge = TimeDuration.fromCurrent( storedNodeData.getTimestamp() );
|
|
|
- final String instanceID = storedNodeData.getInstanceID();
|
|
|
-
|
|
|
- if ( recordAge.isLongerThan( settings.getNodePurgeInterval() ) )
|
|
|
- {
|
|
|
- // purge outdated records
|
|
|
- LOGGER.debug( "purging outdated node reference to instanceID '" + instanceID + "'" );
|
|
|
-
|
|
|
- try
|
|
|
- {
|
|
|
- databaseService.getAccessor().remove( TABLE, dbKeyForStoredNode( storedNodeData ) );
|
|
|
- }
|
|
|
- catch ( PwmException e )
|
|
|
- {
|
|
|
- final String errorMsg = "error purging outdated node reference: " + e.getMessage();
|
|
|
- final ErrorInformation errorInformation = new ErrorInformation( PwmError.ERROR_DB_UNAVAILABLE, errorMsg );
|
|
|
- lastError = errorInformation;
|
|
|
- LOGGER.error( lastError );
|
|
|
- }
|
|
|
- nodeDatas.remove( instanceID );
|
|
|
- }
|
|
|
+ final int purges = clusterDataServiceProvider.purgeOutdatedNodes( settings.getNodePurgeInterval() );
|
|
|
+ clusterStatistics.getNodePurges().addAndGet( purges );
|
|
|
+ }
|
|
|
+ catch ( PwmException e )
|
|
|
+ {
|
|
|
+ final String errorMsg = "error purging outdated node reference: " + e.getMessage();
|
|
|
+ final ErrorInformation errorInformation = new ErrorInformation( PwmError.ERROR_CLUSTER_SERVICE_ERROR, errorMsg );
|
|
|
+ lastError = errorInformation;
|
|
|
+ LOGGER.error( lastError );
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ public ClusterStatistics getClusterStatistics( )
|
|
|
+ {
|
|
|
+ return clusterStatistics;
|
|
|
+ }
|
|
|
}
|