|
@@ -32,7 +32,12 @@ import java.sql.Connection;
|
|
|
import java.sql.PreparedStatement;
|
|
|
import java.sql.ResultSet;
|
|
|
import java.sql.SQLException;
|
|
|
+import java.util.HashSet;
|
|
|
import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.concurrent.locks.ReentrantLock;
|
|
|
|
|
|
/**
|
|
@@ -48,6 +53,14 @@ class DatabaseAccessorImpl implements DatabaseAccessor {
|
|
|
|
|
|
private final boolean traceLogEnabled;
|
|
|
|
|
|
+ private static final AtomicInteger ACCESSOR_COUNTER = new AtomicInteger(0);
|
|
|
+ private final int accessorNumber = ACCESSOR_COUNTER.getAndIncrement();
|
|
|
+
|
|
|
+ private static final AtomicInteger ITERATOR_COUNTER = new AtomicInteger(0);
|
|
|
+ private final Set<DBIterator> outstandingIterators = ConcurrentHashMap.newKeySet();
|
|
|
+
|
|
|
+ private final AtomicBoolean closed = new AtomicBoolean(false);
|
|
|
+
|
|
|
private final ReentrantLock LOCK = new ReentrantLock();
|
|
|
|
|
|
DatabaseAccessorImpl(
|
|
@@ -76,7 +89,6 @@ class DatabaseAccessorImpl implements DatabaseAccessor {
|
|
|
throw databaseException;
|
|
|
}
|
|
|
|
|
|
-
|
|
|
@Override
|
|
|
public boolean put(
|
|
|
final DatabaseTable table,
|
|
@@ -85,6 +97,8 @@ class DatabaseAccessorImpl implements DatabaseAccessor {
|
|
|
)
|
|
|
throws DatabaseException
|
|
|
{
|
|
|
+ preCheck();
|
|
|
+
|
|
|
final DatabaseUtil.DebugInfo debugInfo = DatabaseUtil.DebugInfo.create("put", table, key, value);
|
|
|
|
|
|
return execute(debugInfo, () -> {
|
|
@@ -119,6 +133,8 @@ class DatabaseAccessorImpl implements DatabaseAccessor {
|
|
|
)
|
|
|
throws DatabaseException
|
|
|
{
|
|
|
+ preCheck();
|
|
|
+
|
|
|
final DatabaseUtil.DebugInfo debugInfo = DatabaseUtil.DebugInfo.create("putIfAbsent", table, key, value);
|
|
|
|
|
|
return execute(debugInfo, () -> {
|
|
@@ -146,6 +162,8 @@ class DatabaseAccessorImpl implements DatabaseAccessor {
|
|
|
)
|
|
|
throws DatabaseException
|
|
|
{
|
|
|
+ preCheck();
|
|
|
+
|
|
|
final DatabaseUtil.DebugInfo debugInfo = DatabaseUtil.DebugInfo.create("contains", table, key, null);
|
|
|
|
|
|
return execute(debugInfo, () -> {
|
|
@@ -166,6 +184,8 @@ class DatabaseAccessorImpl implements DatabaseAccessor {
|
|
|
)
|
|
|
throws DatabaseException
|
|
|
{
|
|
|
+ preCheck();
|
|
|
+
|
|
|
final DatabaseUtil.DebugInfo debugInfo = DatabaseUtil.DebugInfo.create("get", table, key, null);
|
|
|
|
|
|
return execute(debugInfo, () -> {
|
|
@@ -206,6 +226,8 @@ class DatabaseAccessorImpl implements DatabaseAccessor {
|
|
|
)
|
|
|
throws DatabaseException
|
|
|
{
|
|
|
+ preCheck();
|
|
|
+
|
|
|
final DatabaseUtil.DebugInfo debugInfo = DatabaseUtil.DebugInfo.create("remove", table, key, null);
|
|
|
|
|
|
execute(debugInfo, () -> {
|
|
@@ -219,9 +241,11 @@ class DatabaseAccessorImpl implements DatabaseAccessor {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public int size(final DatabaseTable table) throws
|
|
|
- DatabaseException
|
|
|
+ public int size(final DatabaseTable table)
|
|
|
+ throws DatabaseException
|
|
|
{
|
|
|
+ preCheck();
|
|
|
+
|
|
|
final DatabaseUtil.DebugInfo debugInfo = DatabaseUtil.DebugInfo.create("size", table, null, null);
|
|
|
|
|
|
return execute(debugInfo, () -> {
|
|
@@ -242,6 +266,8 @@ class DatabaseAccessorImpl implements DatabaseAccessor {
|
|
|
}
|
|
|
|
|
|
boolean isValid() {
|
|
|
+ preCheck();
|
|
|
+
|
|
|
if (connection == null) {
|
|
|
return false;
|
|
|
}
|
|
@@ -268,28 +294,36 @@ class DatabaseAccessorImpl implements DatabaseAccessor {
|
|
|
|
|
|
public class DBIterator implements ClosableIterator<String> {
|
|
|
private final DatabaseTable table;
|
|
|
- private final ResultSet resultSet;
|
|
|
+ private ResultSet resultSet;
|
|
|
+ private PreparedStatement statement;
|
|
|
private String nextValue;
|
|
|
private boolean finished;
|
|
|
+ private int counter = ITERATOR_COUNTER.getAndIncrement();
|
|
|
|
|
|
DBIterator(final DatabaseTable table)
|
|
|
throws DatabaseException
|
|
|
{
|
|
|
this.table = table;
|
|
|
- this.resultSet = init();
|
|
|
+ init();
|
|
|
getNextItem();
|
|
|
}
|
|
|
|
|
|
- private ResultSet init() throws DatabaseException {
|
|
|
- final String sqlText = "SELECT " + DatabaseService.KEY_COLUMN + " FROM " + table.name();
|
|
|
+ private void init() throws DatabaseException {
|
|
|
+ final DatabaseUtil.DebugInfo debugInfo = DatabaseUtil.DebugInfo.create(
|
|
|
+ "iterator #" + counter + " open", table, null, null);
|
|
|
+ traceBegin(debugInfo);
|
|
|
|
|
|
-
|
|
|
- try (PreparedStatement statement = connection.prepareStatement(sqlText)) {
|
|
|
- return statement.executeQuery();
|
|
|
+ final String sqlText = "SELECT " + DatabaseService.KEY_COLUMN + " FROM " + table.name();
|
|
|
+ try {
|
|
|
+ outstandingIterators.add(this);
|
|
|
+ statement = connection.prepareStatement(sqlText);
|
|
|
+ resultSet = statement.executeQuery();
|
|
|
+ connection.commit();
|
|
|
} catch (SQLException e) {
|
|
|
processSqlException(null, e);
|
|
|
}
|
|
|
- return null; // unreachable
|
|
|
+
|
|
|
+ traceResult(debugInfo, null);
|
|
|
}
|
|
|
|
|
|
public boolean hasNext() {
|
|
@@ -324,14 +358,38 @@ class DatabaseAccessorImpl implements DatabaseAccessor {
|
|
|
}
|
|
|
|
|
|
public void close() {
|
|
|
- if (resultSet != null) {
|
|
|
- try {
|
|
|
- resultSet.close();
|
|
|
- } catch (SQLException e) {
|
|
|
- LOGGER.error("error closing inner resultSet in iterator: " + e.getMessage());
|
|
|
+ final DatabaseUtil.DebugInfo debugInfo = DatabaseUtil.DebugInfo.create(
|
|
|
+ "iterator #" + counter + " close", table, null, null);
|
|
|
+ traceBegin(debugInfo);
|
|
|
+
|
|
|
+ try {
|
|
|
+ LOCK.lock();
|
|
|
+ outstandingIterators.remove(this);
|
|
|
+
|
|
|
+ if (resultSet != null) {
|
|
|
+ try {
|
|
|
+ resultSet.close();
|
|
|
+ resultSet = null;
|
|
|
+ } catch (SQLException e) {
|
|
|
+ LOGGER.error("error closing inner resultSet in iterator: " + e.getMessage());
|
|
|
+ }
|
|
|
}
|
|
|
+
|
|
|
+ if (statement != null) {
|
|
|
+ try {
|
|
|
+ statement.close();
|
|
|
+ statement = null;
|
|
|
+ } catch (SQLException e) {
|
|
|
+ LOGGER.error("error closing inner statement in iterator: " + e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ finished = true;
|
|
|
+ } finally {
|
|
|
+ LOCK.unlock();
|
|
|
}
|
|
|
- finished = true;
|
|
|
+
|
|
|
+ traceResult(debugInfo, "outstandingIterators=" + outstandingIterators.size());
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -340,7 +398,7 @@ class DatabaseAccessorImpl implements DatabaseAccessor {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- LOGGER.trace("begin operation: " + StringUtil.mapToString(JsonUtil.deserializeStringMap(JsonUtil.serialize(debugInfo))));
|
|
|
+ LOGGER.trace("accessor #" + accessorNumber + " begin operation: " + JsonUtil.serialize(debugInfo));
|
|
|
}
|
|
|
|
|
|
private void traceResult(
|
|
@@ -356,14 +414,15 @@ class DatabaseAccessorImpl implements DatabaseAccessor {
|
|
|
if (result != null) {
|
|
|
map.put("result", String.valueOf(result));
|
|
|
}
|
|
|
- LOGGER.trace("operation result: " + StringUtil.mapToString(map));
|
|
|
+ LOGGER.trace("accessor #" + accessorNumber + " operation result: " + StringUtil.mapToString(map));
|
|
|
}
|
|
|
|
|
|
private interface SqlFunction<T> {
|
|
|
T execute() throws DatabaseException;
|
|
|
}
|
|
|
|
|
|
- private <T> T execute(final DatabaseUtil.DebugInfo debugInfo, final SqlFunction<T> sqlFunction) throws DatabaseException
|
|
|
+ private <T> T execute(final DatabaseUtil.DebugInfo debugInfo, final SqlFunction<T> sqlFunction)
|
|
|
+ throws DatabaseException
|
|
|
{
|
|
|
traceBegin(debugInfo);
|
|
|
|
|
@@ -391,8 +450,21 @@ class DatabaseAccessorImpl implements DatabaseAccessor {
|
|
|
}
|
|
|
|
|
|
void close() {
|
|
|
+ closed.set(true);
|
|
|
+
|
|
|
try {
|
|
|
LOCK.lock();
|
|
|
+ try {
|
|
|
+ if (!outstandingIterators.isEmpty()) {
|
|
|
+ LOGGER.warn("closing outstanding " + outstandingIterators.size() + " iterators");
|
|
|
+ }
|
|
|
+ for (final DBIterator iterator : new HashSet<>(outstandingIterators)) {
|
|
|
+ iterator.close();
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOGGER.warn("error while closing connection: " + e.getMessage());
|
|
|
+ }
|
|
|
+
|
|
|
try {
|
|
|
connection.close();
|
|
|
} catch (SQLException e) {
|
|
@@ -401,9 +473,12 @@ class DatabaseAccessorImpl implements DatabaseAccessor {
|
|
|
} finally {
|
|
|
LOCK.unlock();
|
|
|
}
|
|
|
+
|
|
|
+ LOGGER.trace("closed accessor #" + accessorNumber);
|
|
|
}
|
|
|
|
|
|
- private boolean containsImpl(final DatabaseTable table, final String key) throws SQLException
|
|
|
+ private boolean containsImpl(final DatabaseTable table, final String key)
|
|
|
+ throws SQLException
|
|
|
{
|
|
|
final String sqlStatement = "SELECT COUNT(" + DatabaseService.KEY_COLUMN + ") FROM " + table.name()
|
|
|
+ " WHERE " + DatabaseService.KEY_COLUMN + " = ?";
|
|
@@ -422,7 +497,8 @@ class DatabaseAccessorImpl implements DatabaseAccessor {
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
- private void executeUpdate(final String sqlStatement, final DatabaseUtil.DebugInfo debugInfo, final String... params) throws DatabaseException
|
|
|
+ private void executeUpdate(final String sqlStatement, final DatabaseUtil.DebugInfo debugInfo, final String... params)
|
|
|
+ throws DatabaseException
|
|
|
{
|
|
|
try (PreparedStatement statement = connection.prepareStatement(sqlStatement) ){
|
|
|
for (int i = 0; i < params.length; i++) {
|
|
@@ -433,4 +509,10 @@ class DatabaseAccessorImpl implements DatabaseAccessor {
|
|
|
processSqlException(debugInfo, e);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ private void preCheck() {
|
|
|
+ if (closed.get()) {
|
|
|
+ throw new IllegalStateException("call to perform database operation but accessor has been closed");
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|