minor improvements
This commit is contained in:
parent
f0f4bf6823
commit
fc999212ce
3 changed files with 5 additions and 8 deletions
|
@ -78,12 +78,7 @@ public class BackwardRecordEmitter extends AbstractEmitter {
|
|||
log.debug("sink is cancelled after partitions poll iteration");
|
||||
}
|
||||
}
|
||||
sendFinishStatsAndCompleteSink(
|
||||
sink,
|
||||
readUntilOffsets.isEmpty()
|
||||
? null
|
||||
: cursor
|
||||
);
|
||||
sendFinishStatsAndCompleteSink(sink, readUntilOffsets.isEmpty() ? null : cursor);
|
||||
log.debug("Polling finished");
|
||||
} catch (InterruptException kafkaInterruptException) {
|
||||
log.debug("Polling finished due to thread interruption");
|
||||
|
|
|
@ -67,7 +67,7 @@ public class MessagesService {
|
|||
private final int defaultPageSize;
|
||||
|
||||
private final Cache<String, Predicate<TopicMessageDTO>> registeredFilters = CacheBuilder.newBuilder()
|
||||
.maximumSize(5_000)
|
||||
.maximumSize(PollingCursorsStorage.MAX_SIZE)
|
||||
.build();
|
||||
|
||||
private final PollingCursorsStorage cursorsStorage = new PollingCursorsStorage();
|
||||
|
|
|
@ -10,8 +10,10 @@ import org.apache.commons.lang3.RandomStringUtils;
|
|||
|
||||
public class PollingCursorsStorage {
|
||||
|
||||
public static final int MAX_SIZE = 10_000;
|
||||
|
||||
private final Cache<String, Cursor> cursorsCache = CacheBuilder.newBuilder()
|
||||
.maximumSize(10_000)
|
||||
.maximumSize(MAX_SIZE)
|
||||
.build();
|
||||
|
||||
public Optional<Cursor> getCursor(String id) {
|
||||
|
|
Loading…
Add table
Reference in a new issue